xref: /linux/fs/dlm/lock.c (revision 0b0e9cdb3d1f8fda823b592b0667b4b0595e2ba7)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb(ls);
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 
737 	/* check if the rsb is active under read lock - likely path */
738 	read_lock_bh(&ls->ls_rsbtbl_lock);
739 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
740 	if (error) {
741 		read_unlock_bh(&ls->ls_rsbtbl_lock);
742 		goto do_new;
743 	}
744 
745 	/*
746 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
747 	 */
748 
749 	if (rsb_flag(r, RSB_INACTIVE)) {
750 		read_unlock_bh(&ls->ls_rsbtbl_lock);
751 		goto do_inactive;
752 	}
753 
754 	kref_get(&r->res_ref);
755 	read_unlock_bh(&ls->ls_rsbtbl_lock);
756 	goto out;
757 
758 
759  do_inactive:
760 	write_lock_bh(&ls->ls_rsbtbl_lock);
761 
762 	/*
763 	 * The expectation here is that the rsb will have HASHED and
764 	 * INACTIVE flags set, and that the rsb can be moved from
765 	 * inactive back to active again.  However, between releasing
766 	 * the read lock and acquiring the write lock, this rsb could
767 	 * have been removed from rsbtbl, and had HASHED cleared, to
768 	 * be freed.  To deal with this case, we would normally need
769 	 * to repeat dlm_search_rsb_tree while holding the write lock,
770 	 * but rcu allows us to simply check the HASHED flag, because
771 	 * the rcu read lock means the rsb will not be freed yet.
772 	 * If the HASHED flag is not set, then the rsb is being freed,
773 	 * so we add a new rsb struct.  If the HASHED flag is set,
774 	 * and INACTIVE is not set, it means another thread has
775 	 * made the rsb active, as we're expecting to do here, and
776 	 * we just repeat the lookup (this will be very unlikely.)
777 	 */
778 	if (rsb_flag(r, RSB_HASHED)) {
779 		if (!rsb_flag(r, RSB_INACTIVE)) {
780 			write_unlock_bh(&ls->ls_rsbtbl_lock);
781 			goto retry;
782 		}
783 	} else {
784 		write_unlock_bh(&ls->ls_rsbtbl_lock);
785 		goto do_new;
786 	}
787 
788 	/*
789 	 * rsb found inactive (master_nodeid may be out of date unless
790 	 * we are the dir_nodeid or were the master)  No other thread
791 	 * is using this rsb because it's inactive, so we can
792 	 * look at or update res_master_nodeid without lock_rsb.
793 	 */
794 
795 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
796 		/* our rsb was not master, and another node (not the dir node)
797 		   has sent us a request */
798 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
799 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
800 			  r->res_name);
801 		write_unlock_bh(&ls->ls_rsbtbl_lock);
802 		error = -ENOTBLK;
803 		goto out;
804 	}
805 
806 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
807 		/* don't think this should ever happen */
808 		log_error(ls, "find_rsb inactive from_dir %d master %d",
809 			  from_nodeid, r->res_master_nodeid);
810 		dlm_print_rsb(r);
811 		/* fix it and go on */
812 		r->res_master_nodeid = our_nodeid;
813 		r->res_nodeid = 0;
814 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
815 		r->res_first_lkid = 0;
816 	}
817 
818 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
819 		/* Because we have held no locks on this rsb,
820 		   res_master_nodeid could have become stale. */
821 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
822 		r->res_first_lkid = 0;
823 	}
824 
825 	/* A dir record will not be on the scan list. */
826 	if (r->res_dir_nodeid != our_nodeid)
827 		del_scan(ls, r);
828 	list_move(&r->res_slow_list, &ls->ls_slow_active);
829 	rsb_clear_flag(r, RSB_INACTIVE);
830 	kref_init(&r->res_ref); /* ref is now used in active state */
831 	write_unlock_bh(&ls->ls_rsbtbl_lock);
832 
833 	goto out;
834 
835 
836  do_new:
837 	/*
838 	 * rsb not found
839 	 */
840 
841 	if (error == -EBADR && !create)
842 		goto out;
843 
844 	error = get_rsb_struct(ls, name, len, &r);
845 	if (WARN_ON_ONCE(error))
846 		goto out;
847 
848 	r->res_hash = hash;
849 	r->res_dir_nodeid = dir_nodeid;
850 	kref_init(&r->res_ref);
851 
852 	if (from_dir) {
853 		/* want to see how often this happens */
854 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
855 			  from_nodeid, r->res_name);
856 		r->res_master_nodeid = our_nodeid;
857 		r->res_nodeid = 0;
858 		goto out_add;
859 	}
860 
861 	if (from_other && (dir_nodeid != our_nodeid)) {
862 		/* should never happen */
863 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
864 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
865 		dlm_free_rsb(r);
866 		r = NULL;
867 		error = -ENOTBLK;
868 		goto out;
869 	}
870 
871 	if (from_other) {
872 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
873 			  from_nodeid, dir_nodeid, r->res_name);
874 	}
875 
876 	if (dir_nodeid == our_nodeid) {
877 		/* When we are the dir nodeid, we can set the master
878 		   node immediately */
879 		r->res_master_nodeid = our_nodeid;
880 		r->res_nodeid = 0;
881 	} else {
882 		/* set_master will send_lookup to dir_nodeid */
883 		r->res_master_nodeid = 0;
884 		r->res_nodeid = -1;
885 	}
886 
887  out_add:
888 
889 	write_lock_bh(&ls->ls_rsbtbl_lock);
890 	error = rsb_insert(r, &ls->ls_rsbtbl);
891 	if (error == -EEXIST) {
892 		/* somebody else was faster and it seems the
893 		 * rsb exists now, we do a whole relookup
894 		 */
895 		write_unlock_bh(&ls->ls_rsbtbl_lock);
896 		dlm_free_rsb(r);
897 		goto retry;
898 	} else if (!error) {
899 		list_add(&r->res_slow_list, &ls->ls_slow_active);
900 	}
901 	write_unlock_bh(&ls->ls_rsbtbl_lock);
902  out:
903 	*r_ret = r;
904 	return error;
905 }
906 
907 /* During recovery, other nodes can send us new MSTCPY locks (from
908    dlm_recover_locks) before we've made ourself master (in
909    dlm_recover_masters). */
910 
911 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
912 			  uint32_t hash, int dir_nodeid, int from_nodeid,
913 			  unsigned int flags, struct dlm_rsb **r_ret)
914 {
915 	struct dlm_rsb *r = NULL;
916 	int our_nodeid = dlm_our_nodeid();
917 	int recover = (flags & R_RECEIVE_RECOVER);
918 	int error;
919 
920  retry:
921 
922 	/* check if the rsb is in active state under read lock - likely path */
923 	read_lock_bh(&ls->ls_rsbtbl_lock);
924 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
925 	if (error) {
926 		read_unlock_bh(&ls->ls_rsbtbl_lock);
927 		goto do_new;
928 	}
929 
930 	if (rsb_flag(r, RSB_INACTIVE)) {
931 		read_unlock_bh(&ls->ls_rsbtbl_lock);
932 		goto do_inactive;
933 	}
934 
935 	/*
936 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
937 	 */
938 
939 	kref_get(&r->res_ref);
940 	read_unlock_bh(&ls->ls_rsbtbl_lock);
941 
942 	goto out;
943 
944 
945  do_inactive:
946 	write_lock_bh(&ls->ls_rsbtbl_lock);
947 
948 	/* See comment in find_rsb_dir. */
949 	if (rsb_flag(r, RSB_HASHED)) {
950 		if (!rsb_flag(r, RSB_INACTIVE)) {
951 			write_unlock_bh(&ls->ls_rsbtbl_lock);
952 			goto retry;
953 		}
954 	} else {
955 		write_unlock_bh(&ls->ls_rsbtbl_lock);
956 		goto do_new;
957 	}
958 
959 
960 	/*
961 	 * rsb found inactive. No other thread is using this rsb because
962 	 * it's inactive, so we can look at or update res_master_nodeid
963 	 * without lock_rsb.
964 	 */
965 
966 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
967 		/* our rsb is not master, and another node has sent us a
968 		   request; this should never happen */
969 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
970 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
971 		dlm_print_rsb(r);
972 		write_unlock_bh(&ls->ls_rsbtbl_lock);
973 		error = -ENOTBLK;
974 		goto out;
975 	}
976 
977 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
978 	    (dir_nodeid == our_nodeid)) {
979 		/* our rsb is not master, and we are dir; may as well fix it;
980 		   this should never happen */
981 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
982 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
983 		dlm_print_rsb(r);
984 		r->res_master_nodeid = our_nodeid;
985 		r->res_nodeid = 0;
986 	}
987 
988 	list_move(&r->res_slow_list, &ls->ls_slow_active);
989 	rsb_clear_flag(r, RSB_INACTIVE);
990 	kref_init(&r->res_ref);
991 	del_scan(ls, r);
992 	write_unlock_bh(&ls->ls_rsbtbl_lock);
993 
994 	goto out;
995 
996 
997  do_new:
998 	/*
999 	 * rsb not found
1000 	 */
1001 
1002 	error = get_rsb_struct(ls, name, len, &r);
1003 	if (WARN_ON_ONCE(error))
1004 		goto out;
1005 
1006 	r->res_hash = hash;
1007 	r->res_dir_nodeid = dir_nodeid;
1008 	r->res_master_nodeid = dir_nodeid;
1009 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1010 	kref_init(&r->res_ref);
1011 
1012 	write_lock_bh(&ls->ls_rsbtbl_lock);
1013 	error = rsb_insert(r, &ls->ls_rsbtbl);
1014 	if (error == -EEXIST) {
1015 		/* somebody else was faster and it seems the
1016 		 * rsb exists now, we do a whole relookup
1017 		 */
1018 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1019 		dlm_free_rsb(r);
1020 		goto retry;
1021 	} else if (!error) {
1022 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1023 	}
1024 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1025 
1026  out:
1027 	*r_ret = r;
1028 	return error;
1029 }
1030 
1031 /*
1032  * rsb rcu usage
1033  *
1034  * While rcu read lock is held, the rsb cannot be freed,
1035  * which allows a lookup optimization.
1036  *
1037  * Two threads are accessing the same rsb concurrently,
1038  * the first (A) is trying to use the rsb, the second (B)
1039  * is trying to free the rsb.
1040  *
1041  * thread A                 thread B
1042  * (trying to use rsb)      (trying to free rsb)
1043  *
1044  * A1. rcu read lock
1045  * A2. rsbtbl read lock
1046  * A3. look up rsb in rsbtbl
1047  * A4. rsbtbl read unlock
1048  *                          B1. rsbtbl write lock
1049  *                          B2. look up rsb in rsbtbl
1050  *                          B3. remove rsb from rsbtbl
1051  *                          B4. clear rsb HASHED flag
1052  *                          B5. rsbtbl write unlock
1053  *                          B6. begin freeing rsb using rcu...
1054  *
1055  * (rsb is inactive, so try to make it active again)
1056  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1057  * A6. the rsb HASHED flag is not set, which it means the rsb
1058  *     is being removed from rsbtbl and freed, so don't use it.
1059  * A7. rcu read unlock
1060  *
1061  *                          B7. ...finish freeing rsb using rcu
1062  * A8. create a new rsb
1063  *
1064  * Without the rcu optimization, steps A5-8 would need to do
1065  * an extra rsbtbl lookup:
1066  * A5. rsbtbl write lock
1067  * A6. look up rsb in rsbtbl, not found
1068  * A7. rsbtbl write unlock
1069  * A8. create a new rsb
1070  */
1071 
1072 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1073 		    int from_nodeid, unsigned int flags,
1074 		    struct dlm_rsb **r_ret)
1075 {
1076 	int dir_nodeid;
1077 	uint32_t hash;
1078 	int rv;
1079 
1080 	if (len > DLM_RESNAME_MAXLEN)
1081 		return -EINVAL;
1082 
1083 	hash = jhash(name, len, 0);
1084 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1085 
1086 	rcu_read_lock();
1087 	if (dlm_no_directory(ls))
1088 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1089 				      from_nodeid, flags, r_ret);
1090 	else
1091 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1092 				    from_nodeid, flags, r_ret);
1093 	rcu_read_unlock();
1094 	return rv;
1095 }
1096 
1097 /* we have received a request and found that res_master_nodeid != our_nodeid,
1098    so we need to return an error or make ourself the master */
1099 
1100 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1101 				  int from_nodeid)
1102 {
1103 	if (dlm_no_directory(ls)) {
1104 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1105 			  from_nodeid, r->res_master_nodeid,
1106 			  r->res_dir_nodeid);
1107 		dlm_print_rsb(r);
1108 		return -ENOTBLK;
1109 	}
1110 
1111 	if (from_nodeid != r->res_dir_nodeid) {
1112 		/* our rsb is not master, and another node (not the dir node)
1113 	   	   has sent us a request.  this is much more common when our
1114 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1115 
1116 		if (r->res_master_nodeid) {
1117 			log_debug(ls, "validate master from_other %d master %d "
1118 				  "dir %d first %x %s", from_nodeid,
1119 				  r->res_master_nodeid, r->res_dir_nodeid,
1120 				  r->res_first_lkid, r->res_name);
1121 		}
1122 		return -ENOTBLK;
1123 	} else {
1124 		/* our rsb is not master, but the dir nodeid has sent us a
1125 	   	   request; this could happen with master 0 / res_nodeid -1 */
1126 
1127 		if (r->res_master_nodeid) {
1128 			log_error(ls, "validate master from_dir %d master %d "
1129 				  "first %x %s",
1130 				  from_nodeid, r->res_master_nodeid,
1131 				  r->res_first_lkid, r->res_name);
1132 		}
1133 
1134 		r->res_master_nodeid = dlm_our_nodeid();
1135 		r->res_nodeid = 0;
1136 		return 0;
1137 	}
1138 }
1139 
1140 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1141 				int from_nodeid, bool is_inactive, unsigned int flags,
1142 				int *r_nodeid, int *result)
1143 {
1144 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1145 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1146 
1147 	if (r->res_dir_nodeid != our_nodeid) {
1148 		/* should not happen, but may as well fix it and carry on */
1149 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1150 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1151 		r->res_dir_nodeid = our_nodeid;
1152 	}
1153 
1154 	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
1155 		/* Recovery uses this function to set a new master when
1156 		 * the previous master failed.  Setting NEW_MASTER will
1157 		 * force dlm_recover_masters to call recover_master on this
1158 		 * rsb even though the res_nodeid is no longer removed.
1159 		 */
1160 
1161 		r->res_master_nodeid = from_nodeid;
1162 		r->res_nodeid = from_nodeid;
1163 		rsb_set_flag(r, RSB_NEW_MASTER);
1164 
1165 		if (is_inactive) {
1166 			/* I don't think we should ever find it inactive. */
1167 			log_error(ls, "%s fix_master inactive", __func__);
1168 			dlm_dump_rsb(r);
1169 		}
1170 	}
1171 
1172 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1173 		/* this will happen if from_nodeid became master during
1174 		 * a previous recovery cycle, and we aborted the previous
1175 		 * cycle before recovering this master value
1176 		 */
1177 
1178 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1179 			  __func__, from_nodeid, r->res_master_nodeid,
1180 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1181 
1182 		if (r->res_master_nodeid == our_nodeid) {
1183 			log_error(ls, "from_master %d our_master", from_nodeid);
1184 			dlm_dump_rsb(r);
1185 			goto ret_assign;
1186 		}
1187 
1188 		r->res_master_nodeid = from_nodeid;
1189 		r->res_nodeid = from_nodeid;
1190 		rsb_set_flag(r, RSB_NEW_MASTER);
1191 	}
1192 
1193 	if (!r->res_master_nodeid) {
1194 		/* this will happen if recovery happens while we're looking
1195 		 * up the master for this rsb
1196 		 */
1197 
1198 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1199 			  from_nodeid, r->res_first_lkid, r->res_name);
1200 		r->res_master_nodeid = from_nodeid;
1201 		r->res_nodeid = from_nodeid;
1202 	}
1203 
1204 	if (!from_master && !fix_master &&
1205 	    (r->res_master_nodeid == from_nodeid)) {
1206 		/* this can happen when the master sends remove, the dir node
1207 		 * finds the rsb on the active list and ignores the remove,
1208 		 * and the former master sends a lookup
1209 		 */
1210 
1211 		log_limit(ls, "%s from master %d flags %x first %x %s",
1212 			  __func__, from_nodeid, flags, r->res_first_lkid,
1213 			  r->res_name);
1214 	}
1215 
1216  ret_assign:
1217 	*r_nodeid = r->res_master_nodeid;
1218 	if (result)
1219 		*result = DLM_LU_MATCH;
1220 }
1221 
1222 /*
1223  * We're the dir node for this res and another node wants to know the
1224  * master nodeid.  During normal operation (non recovery) this is only
1225  * called from receive_lookup(); master lookups when the local node is
1226  * the dir node are done by find_rsb().
1227  *
1228  * normal operation, we are the dir node for a resource
1229  * . _request_lock
1230  * . set_master
1231  * . send_lookup
1232  * . receive_lookup
1233  * . dlm_master_lookup flags 0
1234  *
1235  * recover directory, we are rebuilding dir for all resources
1236  * . dlm_recover_directory
1237  * . dlm_rcom_names
1238  *   remote node sends back the rsb names it is master of and we are dir of
1239  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1240  *   we either create new rsb setting remote node as master, or find existing
1241  *   rsb and set master to be the remote node.
1242  *
1243  * recover masters, we are finding the new master for resources
1244  * . dlm_recover_masters
1245  * . recover_master
1246  * . dlm_send_rcom_lookup
1247  * . receive_rcom_lookup
1248  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1249  */
1250 
1251 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1252 			      int len, unsigned int flags, int *r_nodeid, int *result)
1253 {
1254 	struct dlm_rsb *r = NULL;
1255 	uint32_t hash;
1256 	int our_nodeid = dlm_our_nodeid();
1257 	int dir_nodeid, error;
1258 
1259 	if (len > DLM_RESNAME_MAXLEN)
1260 		return -EINVAL;
1261 
1262 	if (from_nodeid == our_nodeid) {
1263 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1264 			  our_nodeid, flags);
1265 		return -EINVAL;
1266 	}
1267 
1268 	hash = jhash(name, len, 0);
1269 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1270 	if (dir_nodeid != our_nodeid) {
1271 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1272 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1273 			  ls->ls_num_nodes);
1274 		*r_nodeid = -1;
1275 		return -EINVAL;
1276 	}
1277 
1278  retry:
1279 
1280 	/* check if the rsb is active under read lock - likely path */
1281 	read_lock_bh(&ls->ls_rsbtbl_lock);
1282 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1283 	if (!error) {
1284 		if (rsb_flag(r, RSB_INACTIVE)) {
1285 			read_unlock_bh(&ls->ls_rsbtbl_lock);
1286 			goto do_inactive;
1287 		}
1288 
1289 		/* because the rsb is active, we need to lock_rsb before
1290 		 * checking/changing re_master_nodeid
1291 		 */
1292 
1293 		hold_rsb(r);
1294 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1295 		lock_rsb(r);
1296 
1297 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1298 				    flags, r_nodeid, result);
1299 
1300 		/* the rsb was active */
1301 		unlock_rsb(r);
1302 		put_rsb(r);
1303 
1304 		return 0;
1305 	} else {
1306 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1307 		goto not_found;
1308 	}
1309 
1310  do_inactive:
1311 	/* unlikely path - relookup under write */
1312 	write_lock_bh(&ls->ls_rsbtbl_lock);
1313 
1314 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1315 	if (!error) {
1316 		if (!rsb_flag(r, RSB_INACTIVE)) {
1317 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1318 			/* something as changed, very unlikely but
1319 			 * try again
1320 			 */
1321 			goto retry;
1322 		}
1323 	} else {
1324 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1325 		goto not_found;
1326 	}
1327 
1328 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1329 	   is not used, but is protected by the rsbtbl lock */
1330 
1331 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1332 			    r_nodeid, result);
1333 
1334 	/* A dir record rsb should never be on scan list. */
1335 	/* Try to fix this with del_scan? */
1336 	WARN_ON(!list_empty(&r->res_scan_list));
1337 
1338 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1339 
1340 	return 0;
1341 
1342  not_found:
1343 	error = get_rsb_struct(ls, name, len, &r);
1344 	if (WARN_ON_ONCE(error))
1345 		goto out;
1346 
1347 	r->res_hash = hash;
1348 	r->res_dir_nodeid = our_nodeid;
1349 	r->res_master_nodeid = from_nodeid;
1350 	r->res_nodeid = from_nodeid;
1351 	rsb_set_flag(r, RSB_INACTIVE);
1352 
1353 	write_lock_bh(&ls->ls_rsbtbl_lock);
1354 	error = rsb_insert(r, &ls->ls_rsbtbl);
1355 	if (error == -EEXIST) {
1356 		/* somebody else was faster and it seems the
1357 		 * rsb exists now, we do a whole relookup
1358 		 */
1359 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1360 		dlm_free_rsb(r);
1361 		goto retry;
1362 	} else if (error) {
1363 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1364 		/* should never happen */
1365 		dlm_free_rsb(r);
1366 		goto retry;
1367 	}
1368 
1369 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1370 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1371 
1372 	if (result)
1373 		*result = DLM_LU_ADD;
1374 	*r_nodeid = from_nodeid;
1375  out:
1376 	return error;
1377 }
1378 
1379 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1380 		      int len, unsigned int flags, int *r_nodeid, int *result)
1381 {
1382 	int rv;
1383 	rcu_read_lock();
1384 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1385 	rcu_read_unlock();
1386 	return rv;
1387 }
1388 
1389 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1390 {
1391 	struct dlm_rsb *r;
1392 
1393 	read_lock_bh(&ls->ls_rsbtbl_lock);
1394 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1395 		if (r->res_hash == hash)
1396 			dlm_dump_rsb(r);
1397 	}
1398 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1399 }
1400 
1401 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1402 {
1403 	struct dlm_rsb *r = NULL;
1404 	int error;
1405 
1406 	read_lock_bh(&ls->ls_rsbtbl_lock);
1407 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1408 	if (!error)
1409 		goto out;
1410 
1411 	dlm_dump_rsb(r);
1412  out:
1413 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1414 }
1415 
1416 static void deactivate_rsb(struct kref *kref)
1417 {
1418 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1419 	struct dlm_ls *ls = r->res_ls;
1420 	int our_nodeid = dlm_our_nodeid();
1421 
1422 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1423 	rsb_set_flag(r, RSB_INACTIVE);
1424 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1425 
1426 	/*
1427 	 * When the rsb becomes unused:
1428 	 * - If it's not a dir record for a remote master rsb,
1429 	 *   then it is put on the scan list to be freed.
1430 	 * - If it's a dir record for a remote master rsb,
1431 	 *   then it is kept in the inactive state until
1432 	 *   receive_remove() from the master node.
1433 	 */
1434 	if (!dlm_no_directory(ls) &&
1435 	    (r->res_master_nodeid != our_nodeid) &&
1436 	    (dlm_dir_nodeid(r) != our_nodeid))
1437 		add_scan(ls, r);
1438 
1439 	if (r->res_lvbptr) {
1440 		dlm_free_lvb(r->res_lvbptr);
1441 		r->res_lvbptr = NULL;
1442 	}
1443 }
1444 
1445 /* See comment for unhold_lkb */
1446 
1447 static void unhold_rsb(struct dlm_rsb *r)
1448 {
1449 	int rv;
1450 
1451 	/* inactive rsbs are not ref counted */
1452 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
1453 	rv = kref_put(&r->res_ref, deactivate_rsb);
1454 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1455 }
1456 
1457 void free_inactive_rsb(struct dlm_rsb *r)
1458 {
1459 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1460 
1461 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1462 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1463 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1464 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1465 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1466 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1467 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1468 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1469 
1470 	dlm_free_rsb(r);
1471 }
1472 
1473 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1474    The rsb must exist as long as any lkb's for it do. */
1475 
1476 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1477 {
1478 	hold_rsb(r);
1479 	lkb->lkb_resource = r;
1480 }
1481 
1482 static void detach_lkb(struct dlm_lkb *lkb)
1483 {
1484 	if (lkb->lkb_resource) {
1485 		put_rsb(lkb->lkb_resource);
1486 		lkb->lkb_resource = NULL;
1487 	}
1488 }
1489 
1490 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1491 		       unsigned long start, unsigned long end)
1492 {
1493 	struct xa_limit limit;
1494 	struct dlm_lkb *lkb;
1495 	int rv;
1496 
1497 	limit.max = end;
1498 	limit.min = start;
1499 
1500 	lkb = dlm_allocate_lkb(ls);
1501 	if (!lkb)
1502 		return -ENOMEM;
1503 
1504 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1505 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1506 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1507 	lkb->lkb_nodeid = -1;
1508 	lkb->lkb_grmode = DLM_LOCK_IV;
1509 	kref_init(&lkb->lkb_ref);
1510 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1511 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1512 
1513 	write_lock_bh(&ls->ls_lkbxa_lock);
1514 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1515 	write_unlock_bh(&ls->ls_lkbxa_lock);
1516 
1517 	if (rv < 0) {
1518 		log_error(ls, "create_lkb xa error %d", rv);
1519 		dlm_free_lkb(lkb);
1520 		return rv;
1521 	}
1522 
1523 	*lkb_ret = lkb;
1524 	return 0;
1525 }
1526 
1527 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1528 {
1529 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1530 }
1531 
1532 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1533 {
1534 	struct dlm_lkb *lkb;
1535 
1536 	read_lock_bh(&ls->ls_lkbxa_lock);
1537 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1538 	if (lkb)
1539 		kref_get(&lkb->lkb_ref);
1540 	read_unlock_bh(&ls->ls_lkbxa_lock);
1541 
1542 	*lkb_ret = lkb;
1543 	return lkb ? 0 : -ENOENT;
1544 }
1545 
1546 static void kill_lkb(struct kref *kref)
1547 {
1548 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1549 
1550 	/* All work is done after the return from kref_put() so we
1551 	   can release the write_lock before the detach_lkb */
1552 
1553 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1554 }
1555 
1556 /* __put_lkb() is used when an lkb may not have an rsb attached to
1557    it so we need to provide the lockspace explicitly */
1558 
1559 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1560 {
1561 	uint32_t lkid = lkb->lkb_id;
1562 	int rv;
1563 
1564 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1565 					&ls->ls_lkbxa_lock);
1566 	if (rv) {
1567 		xa_erase(&ls->ls_lkbxa, lkid);
1568 		write_unlock_bh(&ls->ls_lkbxa_lock);
1569 
1570 		detach_lkb(lkb);
1571 
1572 		/* for local/process lkbs, lvbptr points to caller's lksb */
1573 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1574 			dlm_free_lvb(lkb->lkb_lvbptr);
1575 		dlm_free_lkb(lkb);
1576 	}
1577 
1578 	return rv;
1579 }
1580 
1581 int dlm_put_lkb(struct dlm_lkb *lkb)
1582 {
1583 	struct dlm_ls *ls;
1584 
1585 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1586 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1587 
1588 	ls = lkb->lkb_resource->res_ls;
1589 	return __put_lkb(ls, lkb);
1590 }
1591 
1592 /* This is only called to add a reference when the code already holds
1593    a valid reference to the lkb, so there's no need for locking. */
1594 
1595 static inline void hold_lkb(struct dlm_lkb *lkb)
1596 {
1597 	kref_get(&lkb->lkb_ref);
1598 }
1599 
1600 static void unhold_lkb_assert(struct kref *kref)
1601 {
1602 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1603 
1604 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1605 }
1606 
1607 /* This is called when we need to remove a reference and are certain
1608    it's not the last ref.  e.g. del_lkb is always called between a
1609    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1610    put_lkb would work fine, but would involve unnecessary locking */
1611 
1612 static inline void unhold_lkb(struct dlm_lkb *lkb)
1613 {
1614 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1615 }
1616 
1617 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1618 			    int mode)
1619 {
1620 	struct dlm_lkb *lkb = NULL, *iter;
1621 
1622 	list_for_each_entry(iter, head, lkb_statequeue)
1623 		if (iter->lkb_rqmode < mode) {
1624 			lkb = iter;
1625 			list_add_tail(new, &iter->lkb_statequeue);
1626 			break;
1627 		}
1628 
1629 	if (!lkb)
1630 		list_add_tail(new, head);
1631 }
1632 
1633 /* add/remove lkb to rsb's grant/convert/wait queue */
1634 
1635 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1636 {
1637 	kref_get(&lkb->lkb_ref);
1638 
1639 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1640 
1641 	lkb->lkb_timestamp = ktime_get();
1642 
1643 	lkb->lkb_status = status;
1644 
1645 	switch (status) {
1646 	case DLM_LKSTS_WAITING:
1647 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1648 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1649 		else
1650 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1651 		break;
1652 	case DLM_LKSTS_GRANTED:
1653 		/* convention says granted locks kept in order of grmode */
1654 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1655 				lkb->lkb_grmode);
1656 		break;
1657 	case DLM_LKSTS_CONVERT:
1658 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1659 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1660 		else
1661 			list_add_tail(&lkb->lkb_statequeue,
1662 				      &r->res_convertqueue);
1663 		break;
1664 	default:
1665 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1666 	}
1667 }
1668 
1669 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1670 {
1671 	lkb->lkb_status = 0;
1672 	list_del(&lkb->lkb_statequeue);
1673 	unhold_lkb(lkb);
1674 }
1675 
1676 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1677 {
1678 	hold_lkb(lkb);
1679 	del_lkb(r, lkb);
1680 	add_lkb(r, lkb, sts);
1681 	unhold_lkb(lkb);
1682 }
1683 
1684 static int msg_reply_type(int mstype)
1685 {
1686 	switch (mstype) {
1687 	case DLM_MSG_REQUEST:
1688 		return DLM_MSG_REQUEST_REPLY;
1689 	case DLM_MSG_CONVERT:
1690 		return DLM_MSG_CONVERT_REPLY;
1691 	case DLM_MSG_UNLOCK:
1692 		return DLM_MSG_UNLOCK_REPLY;
1693 	case DLM_MSG_CANCEL:
1694 		return DLM_MSG_CANCEL_REPLY;
1695 	case DLM_MSG_LOOKUP:
1696 		return DLM_MSG_LOOKUP_REPLY;
1697 	}
1698 	return -1;
1699 }
1700 
1701 /* add/remove lkb from global waiters list of lkb's waiting for
1702    a reply from a remote node */
1703 
1704 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1705 {
1706 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1707 	int error = 0;
1708 
1709 	spin_lock_bh(&ls->ls_waiters_lock);
1710 
1711 	if (is_overlap_unlock(lkb) ||
1712 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1713 		error = -EINVAL;
1714 		goto out;
1715 	}
1716 
1717 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1718 		switch (mstype) {
1719 		case DLM_MSG_UNLOCK:
1720 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1721 			break;
1722 		case DLM_MSG_CANCEL:
1723 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1724 			break;
1725 		default:
1726 			error = -EBUSY;
1727 			goto out;
1728 		}
1729 		lkb->lkb_wait_count++;
1730 		hold_lkb(lkb);
1731 
1732 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1733 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1734 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1735 		goto out;
1736 	}
1737 
1738 	DLM_ASSERT(!lkb->lkb_wait_count,
1739 		   dlm_print_lkb(lkb);
1740 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1741 
1742 	lkb->lkb_wait_count++;
1743 	lkb->lkb_wait_type = mstype;
1744 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1745 	hold_lkb(lkb);
1746 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1747  out:
1748 	if (error)
1749 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1750 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1751 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1752 	spin_unlock_bh(&ls->ls_waiters_lock);
1753 	return error;
1754 }
1755 
1756 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1757    list as part of process_requestqueue (e.g. a lookup that has an optimized
1758    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1759    set RESEND and dlm_recover_waiters_post() */
1760 
1761 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1762 				const struct dlm_message *ms)
1763 {
1764 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1765 	int overlap_done = 0;
1766 
1767 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1768 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1769 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1770 		overlap_done = 1;
1771 		goto out_del;
1772 	}
1773 
1774 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1775 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1776 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1777 		overlap_done = 1;
1778 		goto out_del;
1779 	}
1780 
1781 	/* Cancel state was preemptively cleared by a successful convert,
1782 	   see next comment, nothing to do. */
1783 
1784 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1785 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1786 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1787 			  lkb->lkb_id, lkb->lkb_wait_type);
1788 		return -1;
1789 	}
1790 
1791 	/* Remove for the convert reply, and premptively remove for the
1792 	   cancel reply.  A convert has been granted while there's still
1793 	   an outstanding cancel on it (the cancel is moot and the result
1794 	   in the cancel reply should be 0).  We preempt the cancel reply
1795 	   because the app gets the convert result and then can follow up
1796 	   with another op, like convert.  This subsequent op would see the
1797 	   lingering state of the cancel and fail with -EBUSY. */
1798 
1799 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1800 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1801 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1802 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1803 			  lkb->lkb_id);
1804 		lkb->lkb_wait_type = 0;
1805 		lkb->lkb_wait_count--;
1806 		unhold_lkb(lkb);
1807 		goto out_del;
1808 	}
1809 
1810 	/* N.B. type of reply may not always correspond to type of original
1811 	   msg due to lookup->request optimization, verify others? */
1812 
1813 	if (lkb->lkb_wait_type) {
1814 		lkb->lkb_wait_type = 0;
1815 		goto out_del;
1816 	}
1817 
1818 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1819 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1820 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1821 	return -1;
1822 
1823  out_del:
1824 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1825 	   to the op that was in progress prior to the unlock/cancel; we
1826 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1827 	   this would happen */
1828 
1829 	if (overlap_done && lkb->lkb_wait_type) {
1830 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1831 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1832 		lkb->lkb_wait_count--;
1833 		unhold_lkb(lkb);
1834 		lkb->lkb_wait_type = 0;
1835 	}
1836 
1837 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1838 
1839 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1840 	lkb->lkb_wait_count--;
1841 	if (!lkb->lkb_wait_count)
1842 		list_del_init(&lkb->lkb_wait_reply);
1843 	unhold_lkb(lkb);
1844 	return 0;
1845 }
1846 
1847 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1848 {
1849 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1850 	int error;
1851 
1852 	spin_lock_bh(&ls->ls_waiters_lock);
1853 	error = _remove_from_waiters(lkb, mstype, NULL);
1854 	spin_unlock_bh(&ls->ls_waiters_lock);
1855 	return error;
1856 }
1857 
1858 /* Handles situations where we might be processing a "fake" or "local" reply in
1859  * the recovery context which stops any locking activity. Only debugfs might
1860  * change the lockspace waiters but they will held the recovery lock to ensure
1861  * remove_from_waiters_ms() in local case will be the only user manipulating the
1862  * lockspace waiters in recovery context.
1863  */
1864 
1865 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1866 				  const struct dlm_message *ms, bool local)
1867 {
1868 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1869 	int error;
1870 
1871 	if (!local)
1872 		spin_lock_bh(&ls->ls_waiters_lock);
1873 	else
1874 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1875 			     !dlm_locking_stopped(ls));
1876 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1877 	if (!local)
1878 		spin_unlock_bh(&ls->ls_waiters_lock);
1879 	return error;
1880 }
1881 
1882 /* lkb is master or local copy */
1883 
1884 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885 {
1886 	int b, len = r->res_ls->ls_lvblen;
1887 
1888 	/* b=1 lvb returned to caller
1889 	   b=0 lvb written to rsb or invalidated
1890 	   b=-1 do nothing */
1891 
1892 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1893 
1894 	if (b == 1) {
1895 		if (!lkb->lkb_lvbptr)
1896 			return;
1897 
1898 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1899 			return;
1900 
1901 		if (!r->res_lvbptr)
1902 			return;
1903 
1904 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1905 		lkb->lkb_lvbseq = r->res_lvbseq;
1906 
1907 	} else if (b == 0) {
1908 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1909 			rsb_set_flag(r, RSB_VALNOTVALID);
1910 			return;
1911 		}
1912 
1913 		if (!lkb->lkb_lvbptr)
1914 			return;
1915 
1916 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1917 			return;
1918 
1919 		if (!r->res_lvbptr)
1920 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1921 
1922 		if (!r->res_lvbptr)
1923 			return;
1924 
1925 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1926 		r->res_lvbseq++;
1927 		lkb->lkb_lvbseq = r->res_lvbseq;
1928 		rsb_clear_flag(r, RSB_VALNOTVALID);
1929 	}
1930 
1931 	if (rsb_flag(r, RSB_VALNOTVALID))
1932 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1933 }
1934 
1935 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1936 {
1937 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1938 		return;
1939 
1940 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1941 		rsb_set_flag(r, RSB_VALNOTVALID);
1942 		return;
1943 	}
1944 
1945 	if (!lkb->lkb_lvbptr)
1946 		return;
1947 
1948 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1949 		return;
1950 
1951 	if (!r->res_lvbptr)
1952 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1953 
1954 	if (!r->res_lvbptr)
1955 		return;
1956 
1957 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1958 	r->res_lvbseq++;
1959 	rsb_clear_flag(r, RSB_VALNOTVALID);
1960 }
1961 
1962 /* lkb is process copy (pc) */
1963 
1964 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1965 			    const struct dlm_message *ms)
1966 {
1967 	int b;
1968 
1969 	if (!lkb->lkb_lvbptr)
1970 		return;
1971 
1972 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1973 		return;
1974 
1975 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1976 	if (b == 1) {
1977 		int len = receive_extralen(ms);
1978 		if (len > r->res_ls->ls_lvblen)
1979 			len = r->res_ls->ls_lvblen;
1980 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1981 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1982 	}
1983 }
1984 
1985 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1986    remove_lock -- used for unlock, removes lkb from granted
1987    revert_lock -- used for cancel, moves lkb from convert to granted
1988    grant_lock  -- used for request and convert, adds lkb to granted or
1989                   moves lkb from convert or waiting to granted
1990 
1991    Each of these is used for master or local copy lkb's.  There is
1992    also a _pc() variation used to make the corresponding change on
1993    a process copy (pc) lkb. */
1994 
1995 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1996 {
1997 	del_lkb(r, lkb);
1998 	lkb->lkb_grmode = DLM_LOCK_IV;
1999 	/* this unhold undoes the original ref from create_lkb()
2000 	   so this leads to the lkb being freed */
2001 	unhold_lkb(lkb);
2002 }
2003 
2004 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005 {
2006 	set_lvb_unlock(r, lkb);
2007 	_remove_lock(r, lkb);
2008 }
2009 
2010 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2011 {
2012 	_remove_lock(r, lkb);
2013 }
2014 
2015 /* returns: 0 did nothing
2016 	    1 moved lock to granted
2017 	   -1 removed lock */
2018 
2019 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021 	int rv = 0;
2022 
2023 	lkb->lkb_rqmode = DLM_LOCK_IV;
2024 
2025 	switch (lkb->lkb_status) {
2026 	case DLM_LKSTS_GRANTED:
2027 		break;
2028 	case DLM_LKSTS_CONVERT:
2029 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2030 		rv = 1;
2031 		break;
2032 	case DLM_LKSTS_WAITING:
2033 		del_lkb(r, lkb);
2034 		lkb->lkb_grmode = DLM_LOCK_IV;
2035 		/* this unhold undoes the original ref from create_lkb()
2036 		   so this leads to the lkb being freed */
2037 		unhold_lkb(lkb);
2038 		rv = -1;
2039 		break;
2040 	default:
2041 		log_print("invalid status for revert %d", lkb->lkb_status);
2042 	}
2043 	return rv;
2044 }
2045 
2046 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2047 {
2048 	return revert_lock(r, lkb);
2049 }
2050 
2051 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2052 {
2053 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2054 		lkb->lkb_grmode = lkb->lkb_rqmode;
2055 		if (lkb->lkb_status)
2056 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2057 		else
2058 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2059 	}
2060 
2061 	lkb->lkb_rqmode = DLM_LOCK_IV;
2062 	lkb->lkb_highbast = 0;
2063 }
2064 
2065 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 {
2067 	set_lvb_lock(r, lkb);
2068 	_grant_lock(r, lkb);
2069 }
2070 
2071 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2072 			  const struct dlm_message *ms)
2073 {
2074 	set_lvb_lock_pc(r, lkb, ms);
2075 	_grant_lock(r, lkb);
2076 }
2077 
2078 /* called by grant_pending_locks() which means an async grant message must
2079    be sent to the requesting node in addition to granting the lock if the
2080    lkb belongs to a remote node. */
2081 
2082 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2083 {
2084 	grant_lock(r, lkb);
2085 	if (is_master_copy(lkb))
2086 		send_grant(r, lkb);
2087 	else
2088 		queue_cast(r, lkb, 0);
2089 }
2090 
2091 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2092    change the granted/requested modes.  We're munging things accordingly in
2093    the process copy.
2094    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2095    conversion deadlock
2096    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2097    compatible with other granted locks */
2098 
2099 static void munge_demoted(struct dlm_lkb *lkb)
2100 {
2101 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2102 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2103 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2104 		return;
2105 	}
2106 
2107 	lkb->lkb_grmode = DLM_LOCK_NL;
2108 }
2109 
2110 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2111 {
2112 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2113 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2114 		log_print("munge_altmode %x invalid reply type %d",
2115 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2116 		return;
2117 	}
2118 
2119 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2120 		lkb->lkb_rqmode = DLM_LOCK_PR;
2121 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2122 		lkb->lkb_rqmode = DLM_LOCK_CW;
2123 	else {
2124 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2125 		dlm_print_lkb(lkb);
2126 	}
2127 }
2128 
2129 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2130 {
2131 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2132 					   lkb_statequeue);
2133 	if (lkb->lkb_id == first->lkb_id)
2134 		return 1;
2135 
2136 	return 0;
2137 }
2138 
2139 /* Check if the given lkb conflicts with another lkb on the queue. */
2140 
2141 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2142 {
2143 	struct dlm_lkb *this;
2144 
2145 	list_for_each_entry(this, head, lkb_statequeue) {
2146 		if (this == lkb)
2147 			continue;
2148 		if (!modes_compat(this, lkb))
2149 			return 1;
2150 	}
2151 	return 0;
2152 }
2153 
2154 /*
2155  * "A conversion deadlock arises with a pair of lock requests in the converting
2156  * queue for one resource.  The granted mode of each lock blocks the requested
2157  * mode of the other lock."
2158  *
2159  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2160  * convert queue from being granted, then deadlk/demote lkb.
2161  *
2162  * Example:
2163  * Granted Queue: empty
2164  * Convert Queue: NL->EX (first lock)
2165  *                PR->EX (second lock)
2166  *
2167  * The first lock can't be granted because of the granted mode of the second
2168  * lock and the second lock can't be granted because it's not first in the
2169  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2170  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2171  * flag set and return DEMOTED in the lksb flags.
2172  *
2173  * Originally, this function detected conv-deadlk in a more limited scope:
2174  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2175  * - if lkb1 was the first entry in the queue (not just earlier), and was
2176  *   blocked by the granted mode of lkb2, and there was nothing on the
2177  *   granted queue preventing lkb1 from being granted immediately, i.e.
2178  *   lkb2 was the only thing preventing lkb1 from being granted.
2179  *
2180  * That second condition meant we'd only say there was conv-deadlk if
2181  * resolving it (by demotion) would lead to the first lock on the convert
2182  * queue being granted right away.  It allowed conversion deadlocks to exist
2183  * between locks on the convert queue while they couldn't be granted anyway.
2184  *
2185  * Now, we detect and take action on conversion deadlocks immediately when
2186  * they're created, even if they may not be immediately consequential.  If
2187  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2188  * mode that would prevent lkb1's conversion from being granted, we do a
2189  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2190  * I think this means that the lkb_is_ahead condition below should always
2191  * be zero, i.e. there will never be conv-deadlk between two locks that are
2192  * both already on the convert queue.
2193  */
2194 
2195 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2196 {
2197 	struct dlm_lkb *lkb1;
2198 	int lkb_is_ahead = 0;
2199 
2200 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2201 		if (lkb1 == lkb2) {
2202 			lkb_is_ahead = 1;
2203 			continue;
2204 		}
2205 
2206 		if (!lkb_is_ahead) {
2207 			if (!modes_compat(lkb2, lkb1))
2208 				return 1;
2209 		} else {
2210 			if (!modes_compat(lkb2, lkb1) &&
2211 			    !modes_compat(lkb1, lkb2))
2212 				return 1;
2213 		}
2214 	}
2215 	return 0;
2216 }
2217 
2218 /*
2219  * Return 1 if the lock can be granted, 0 otherwise.
2220  * Also detect and resolve conversion deadlocks.
2221  *
2222  * lkb is the lock to be granted
2223  *
2224  * now is 1 if the function is being called in the context of the
2225  * immediate request, it is 0 if called later, after the lock has been
2226  * queued.
2227  *
2228  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2229  * after recovery.
2230  *
2231  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2232  */
2233 
2234 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2235 			   int recover)
2236 {
2237 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2238 
2239 	/*
2240 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2241 	 * a new request for a NL mode lock being blocked.
2242 	 *
2243 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2244 	 * request, then it would be granted.  In essence, the use of this flag
2245 	 * tells the Lock Manager to expedite theis request by not considering
2246 	 * what may be in the CONVERTING or WAITING queues...  As of this
2247 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2248 	 * mode locks.  This flag is not valid for conversion requests.
2249 	 *
2250 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2251 	 * conversion or used with a non-NL requested mode.  We also know an
2252 	 * EXPEDITE request is always granted immediately, so now must always
2253 	 * be 1.  The full condition to grant an expedite request: (now &&
2254 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2255 	 * therefore be shortened to just checking the flag.
2256 	 */
2257 
2258 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2259 		return 1;
2260 
2261 	/*
2262 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2263 	 * added to the remaining conditions.
2264 	 */
2265 
2266 	if (queue_conflict(&r->res_grantqueue, lkb))
2267 		return 0;
2268 
2269 	/*
2270 	 * 6-3: By default, a conversion request is immediately granted if the
2271 	 * requested mode is compatible with the modes of all other granted
2272 	 * locks
2273 	 */
2274 
2275 	if (queue_conflict(&r->res_convertqueue, lkb))
2276 		return 0;
2277 
2278 	/*
2279 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2280 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2281 	 * The lkb's may have been rebuilt on the queues in a different
2282 	 * order than they were in on the previous master.  So, granting
2283 	 * queued conversions in order after recovery doesn't make sense
2284 	 * since the order hasn't been preserved anyway.  The new order
2285 	 * could also have created a new "in place" conversion deadlock.
2286 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2287 	 * After recovery, there would be no granted locks, and possibly
2288 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2289 	 * recovery, grant conversions without considering order.
2290 	 */
2291 
2292 	if (conv && recover)
2293 		return 1;
2294 
2295 	/*
2296 	 * 6-5: But the default algorithm for deciding whether to grant or
2297 	 * queue conversion requests does not by itself guarantee that such
2298 	 * requests are serviced on a "first come first serve" basis.  This, in
2299 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2300 	 *
2301 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2302 	 * the system service employed to request a lock conversion.  This flag
2303 	 * forces certain conversion requests to be queued, even if they are
2304 	 * compatible with the granted modes of other locks on the same
2305 	 * resource.  Thus, the use of this flag results in conversion requests
2306 	 * being ordered on a "first come first servce" basis.
2307 	 *
2308 	 * DCT: This condition is all about new conversions being able to occur
2309 	 * "in place" while the lock remains on the granted queue (assuming
2310 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2311 	 * doesn't _have_ to go onto the convert queue where it's processed in
2312 	 * order.  The "now" variable is necessary to distinguish converts
2313 	 * being received and processed for the first time now, because once a
2314 	 * convert is moved to the conversion queue the condition below applies
2315 	 * requiring fifo granting.
2316 	 */
2317 
2318 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2319 		return 1;
2320 
2321 	/*
2322 	 * Even if the convert is compat with all granted locks,
2323 	 * QUECVT forces it behind other locks on the convert queue.
2324 	 */
2325 
2326 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2327 		if (list_empty(&r->res_convertqueue))
2328 			return 1;
2329 		else
2330 			return 0;
2331 	}
2332 
2333 	/*
2334 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2335 	 * order.
2336 	 */
2337 
2338 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2339 		return 1;
2340 
2341 	/*
2342 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2343 	 * granted until all other conversion requests ahead of it are granted
2344 	 * and/or canceled.
2345 	 */
2346 
2347 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2348 		return 1;
2349 
2350 	/*
2351 	 * 6-4: By default, a new request is immediately granted only if all
2352 	 * three of the following conditions are satisfied when the request is
2353 	 * issued:
2354 	 * - The queue of ungranted conversion requests for the resource is
2355 	 *   empty.
2356 	 * - The queue of ungranted new requests for the resource is empty.
2357 	 * - The mode of the new request is compatible with the most
2358 	 *   restrictive mode of all granted locks on the resource.
2359 	 */
2360 
2361 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2362 	    list_empty(&r->res_waitqueue))
2363 		return 1;
2364 
2365 	/*
2366 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2367 	 * it cannot be granted until the queue of ungranted conversion
2368 	 * requests is empty, all ungranted new requests ahead of it are
2369 	 * granted and/or canceled, and it is compatible with the granted mode
2370 	 * of the most restrictive lock granted on the resource.
2371 	 */
2372 
2373 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2374 	    first_in_list(lkb, &r->res_waitqueue))
2375 		return 1;
2376 
2377 	return 0;
2378 }
2379 
2380 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2381 			  int recover, int *err)
2382 {
2383 	int rv;
2384 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2385 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2386 
2387 	if (err)
2388 		*err = 0;
2389 
2390 	rv = _can_be_granted(r, lkb, now, recover);
2391 	if (rv)
2392 		goto out;
2393 
2394 	/*
2395 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2396 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2397 	 * cancels one of the locks.
2398 	 */
2399 
2400 	if (is_convert && can_be_queued(lkb) &&
2401 	    conversion_deadlock_detect(r, lkb)) {
2402 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2403 			lkb->lkb_grmode = DLM_LOCK_NL;
2404 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2405 		} else if (err) {
2406 			*err = -EDEADLK;
2407 		} else {
2408 			log_print("can_be_granted deadlock %x now %d",
2409 				  lkb->lkb_id, now);
2410 			dlm_dump_rsb(r);
2411 		}
2412 		goto out;
2413 	}
2414 
2415 	/*
2416 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2417 	 * to grant a request in a mode other than the normal rqmode.  It's a
2418 	 * simple way to provide a big optimization to applications that can
2419 	 * use them.
2420 	 */
2421 
2422 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2423 		alt = DLM_LOCK_PR;
2424 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2425 		alt = DLM_LOCK_CW;
2426 
2427 	if (alt) {
2428 		lkb->lkb_rqmode = alt;
2429 		rv = _can_be_granted(r, lkb, now, 0);
2430 		if (rv)
2431 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2432 		else
2433 			lkb->lkb_rqmode = rqmode;
2434 	}
2435  out:
2436 	return rv;
2437 }
2438 
2439 /* Returns the highest requested mode of all blocked conversions; sets
2440    cw if there's a blocked conversion to DLM_LOCK_CW. */
2441 
2442 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2443 				 unsigned int *count)
2444 {
2445 	struct dlm_lkb *lkb, *s;
2446 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2447 	int hi, demoted, quit, grant_restart, demote_restart;
2448 	int deadlk;
2449 
2450 	quit = 0;
2451  restart:
2452 	grant_restart = 0;
2453 	demote_restart = 0;
2454 	hi = DLM_LOCK_IV;
2455 
2456 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2457 		demoted = is_demoted(lkb);
2458 		deadlk = 0;
2459 
2460 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2461 			grant_lock_pending(r, lkb);
2462 			grant_restart = 1;
2463 			if (count)
2464 				(*count)++;
2465 			continue;
2466 		}
2467 
2468 		if (!demoted && is_demoted(lkb)) {
2469 			log_print("WARN: pending demoted %x node %d %s",
2470 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2471 			demote_restart = 1;
2472 			continue;
2473 		}
2474 
2475 		if (deadlk) {
2476 			/*
2477 			 * If DLM_LKB_NODLKWT flag is set and conversion
2478 			 * deadlock is detected, we request blocking AST and
2479 			 * down (or cancel) conversion.
2480 			 */
2481 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2482 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2483 					queue_bast(r, lkb, lkb->lkb_rqmode);
2484 					lkb->lkb_highbast = lkb->lkb_rqmode;
2485 				}
2486 			} else {
2487 				log_print("WARN: pending deadlock %x node %d %s",
2488 					  lkb->lkb_id, lkb->lkb_nodeid,
2489 					  r->res_name);
2490 				dlm_dump_rsb(r);
2491 			}
2492 			continue;
2493 		}
2494 
2495 		hi = max_t(int, lkb->lkb_rqmode, hi);
2496 
2497 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2498 			*cw = 1;
2499 	}
2500 
2501 	if (grant_restart)
2502 		goto restart;
2503 	if (demote_restart && !quit) {
2504 		quit = 1;
2505 		goto restart;
2506 	}
2507 
2508 	return max_t(int, high, hi);
2509 }
2510 
2511 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2512 			      unsigned int *count)
2513 {
2514 	struct dlm_lkb *lkb, *s;
2515 
2516 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2517 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2518 			grant_lock_pending(r, lkb);
2519 			if (count)
2520 				(*count)++;
2521 		} else {
2522 			high = max_t(int, lkb->lkb_rqmode, high);
2523 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2524 				*cw = 1;
2525 		}
2526 	}
2527 
2528 	return high;
2529 }
2530 
2531 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2532    on either the convert or waiting queue.
2533    high is the largest rqmode of all locks blocked on the convert or
2534    waiting queue. */
2535 
2536 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2537 {
2538 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2539 		if (gr->lkb_highbast < DLM_LOCK_EX)
2540 			return 1;
2541 		return 0;
2542 	}
2543 
2544 	if (gr->lkb_highbast < high &&
2545 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2546 		return 1;
2547 	return 0;
2548 }
2549 
2550 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2551 {
2552 	struct dlm_lkb *lkb, *s;
2553 	int high = DLM_LOCK_IV;
2554 	int cw = 0;
2555 
2556 	if (!is_master(r)) {
2557 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2558 		dlm_dump_rsb(r);
2559 		return;
2560 	}
2561 
2562 	high = grant_pending_convert(r, high, &cw, count);
2563 	high = grant_pending_wait(r, high, &cw, count);
2564 
2565 	if (high == DLM_LOCK_IV)
2566 		return;
2567 
2568 	/*
2569 	 * If there are locks left on the wait/convert queue then send blocking
2570 	 * ASTs to granted locks based on the largest requested mode (high)
2571 	 * found above.
2572 	 */
2573 
2574 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2575 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2576 			if (cw && high == DLM_LOCK_PR &&
2577 			    lkb->lkb_grmode == DLM_LOCK_PR)
2578 				queue_bast(r, lkb, DLM_LOCK_CW);
2579 			else
2580 				queue_bast(r, lkb, high);
2581 			lkb->lkb_highbast = high;
2582 		}
2583 	}
2584 }
2585 
2586 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2587 {
2588 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2589 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2590 		if (gr->lkb_highbast < DLM_LOCK_EX)
2591 			return 1;
2592 		return 0;
2593 	}
2594 
2595 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2596 		return 1;
2597 	return 0;
2598 }
2599 
2600 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2601 			    struct dlm_lkb *lkb)
2602 {
2603 	struct dlm_lkb *gr;
2604 
2605 	list_for_each_entry(gr, head, lkb_statequeue) {
2606 		/* skip self when sending basts to convertqueue */
2607 		if (gr == lkb)
2608 			continue;
2609 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2610 			queue_bast(r, gr, lkb->lkb_rqmode);
2611 			gr->lkb_highbast = lkb->lkb_rqmode;
2612 		}
2613 	}
2614 }
2615 
2616 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2617 {
2618 	send_bast_queue(r, &r->res_grantqueue, lkb);
2619 }
2620 
2621 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2622 {
2623 	send_bast_queue(r, &r->res_grantqueue, lkb);
2624 	send_bast_queue(r, &r->res_convertqueue, lkb);
2625 }
2626 
2627 /* set_master(r, lkb) -- set the master nodeid of a resource
2628 
2629    The purpose of this function is to set the nodeid field in the given
2630    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2631    known, it can just be copied to the lkb and the function will return
2632    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2633    before it can be copied to the lkb.
2634 
2635    When the rsb nodeid is being looked up remotely, the initial lkb
2636    causing the lookup is kept on the ls_waiters list waiting for the
2637    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2638    on the rsb's res_lookup list until the master is verified.
2639 
2640    Return values:
2641    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2642    1: the rsb master is not available and the lkb has been placed on
2643       a wait queue
2644 */
2645 
2646 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2647 {
2648 	int our_nodeid = dlm_our_nodeid();
2649 
2650 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2651 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2652 		r->res_first_lkid = lkb->lkb_id;
2653 		lkb->lkb_nodeid = r->res_nodeid;
2654 		return 0;
2655 	}
2656 
2657 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2658 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2659 		return 1;
2660 	}
2661 
2662 	if (r->res_master_nodeid == our_nodeid) {
2663 		lkb->lkb_nodeid = 0;
2664 		return 0;
2665 	}
2666 
2667 	if (r->res_master_nodeid) {
2668 		lkb->lkb_nodeid = r->res_master_nodeid;
2669 		return 0;
2670 	}
2671 
2672 	if (dlm_dir_nodeid(r) == our_nodeid) {
2673 		/* This is a somewhat unusual case; find_rsb will usually
2674 		   have set res_master_nodeid when dir nodeid is local, but
2675 		   there are cases where we become the dir node after we've
2676 		   past find_rsb and go through _request_lock again.
2677 		   confirm_master() or process_lookup_list() needs to be
2678 		   called after this. */
2679 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2680 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2681 			  r->res_name);
2682 		r->res_master_nodeid = our_nodeid;
2683 		r->res_nodeid = 0;
2684 		lkb->lkb_nodeid = 0;
2685 		return 0;
2686 	}
2687 
2688 	r->res_first_lkid = lkb->lkb_id;
2689 	send_lookup(r, lkb);
2690 	return 1;
2691 }
2692 
2693 static void process_lookup_list(struct dlm_rsb *r)
2694 {
2695 	struct dlm_lkb *lkb, *safe;
2696 
2697 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2698 		list_del_init(&lkb->lkb_rsb_lookup);
2699 		_request_lock(r, lkb);
2700 	}
2701 }
2702 
2703 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2704 
2705 static void confirm_master(struct dlm_rsb *r, int error)
2706 {
2707 	struct dlm_lkb *lkb;
2708 
2709 	if (!r->res_first_lkid)
2710 		return;
2711 
2712 	switch (error) {
2713 	case 0:
2714 	case -EINPROGRESS:
2715 		r->res_first_lkid = 0;
2716 		process_lookup_list(r);
2717 		break;
2718 
2719 	case -EAGAIN:
2720 	case -EBADR:
2721 	case -ENOTBLK:
2722 		/* the remote request failed and won't be retried (it was
2723 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2724 		   lkb the first_lkid */
2725 
2726 		r->res_first_lkid = 0;
2727 
2728 		if (!list_empty(&r->res_lookup)) {
2729 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2730 					 lkb_rsb_lookup);
2731 			list_del_init(&lkb->lkb_rsb_lookup);
2732 			r->res_first_lkid = lkb->lkb_id;
2733 			_request_lock(r, lkb);
2734 		}
2735 		break;
2736 
2737 	default:
2738 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2739 	}
2740 }
2741 
2742 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2743 			 int namelen, void (*ast)(void *astparam),
2744 			 void *astparam,
2745 			 void (*bast)(void *astparam, int mode),
2746 			 struct dlm_args *args)
2747 {
2748 	int rv = -EINVAL;
2749 
2750 	/* check for invalid arg usage */
2751 
2752 	if (mode < 0 || mode > DLM_LOCK_EX)
2753 		goto out;
2754 
2755 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2756 		goto out;
2757 
2758 	if (flags & DLM_LKF_CANCEL)
2759 		goto out;
2760 
2761 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2762 		goto out;
2763 
2764 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2765 		goto out;
2766 
2767 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2768 		goto out;
2769 
2770 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2771 		goto out;
2772 
2773 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2774 		goto out;
2775 
2776 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2777 		goto out;
2778 
2779 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2780 		goto out;
2781 
2782 	if (!ast || !lksb)
2783 		goto out;
2784 
2785 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2786 		goto out;
2787 
2788 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2789 		goto out;
2790 
2791 	/* these args will be copied to the lkb in validate_lock_args,
2792 	   it cannot be done now because when converting locks, fields in
2793 	   an active lkb cannot be modified before locking the rsb */
2794 
2795 	args->flags = flags;
2796 	args->astfn = ast;
2797 	args->astparam = astparam;
2798 	args->bastfn = bast;
2799 	args->mode = mode;
2800 	args->lksb = lksb;
2801 	rv = 0;
2802  out:
2803 	return rv;
2804 }
2805 
2806 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2807 {
2808 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2809  		      DLM_LKF_FORCEUNLOCK))
2810 		return -EINVAL;
2811 
2812 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2813 		return -EINVAL;
2814 
2815 	args->flags = flags;
2816 	args->astparam = astarg;
2817 	return 0;
2818 }
2819 
2820 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2821 			      struct dlm_args *args)
2822 {
2823 	int rv = -EBUSY;
2824 
2825 	if (args->flags & DLM_LKF_CONVERT) {
2826 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2827 			goto out;
2828 
2829 		/* lock not allowed if there's any op in progress */
2830 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2831 			goto out;
2832 
2833 		if (is_overlap(lkb))
2834 			goto out;
2835 
2836 		rv = -EINVAL;
2837 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2838 			goto out;
2839 
2840 		if (args->flags & DLM_LKF_QUECVT &&
2841 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2842 			goto out;
2843 	}
2844 
2845 	lkb->lkb_exflags = args->flags;
2846 	dlm_set_sbflags_val(lkb, 0);
2847 	lkb->lkb_astfn = args->astfn;
2848 	lkb->lkb_astparam = args->astparam;
2849 	lkb->lkb_bastfn = args->bastfn;
2850 	lkb->lkb_rqmode = args->mode;
2851 	lkb->lkb_lksb = args->lksb;
2852 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2853 	lkb->lkb_ownpid = (int) current->pid;
2854 	rv = 0;
2855  out:
2856 	switch (rv) {
2857 	case 0:
2858 		break;
2859 	case -EINVAL:
2860 		/* annoy the user because dlm usage is wrong */
2861 		WARN_ON(1);
2862 		log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2863 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2864 			  lkb->lkb_status, lkb->lkb_wait_type,
2865 			  lkb->lkb_resource->res_name);
2866 		break;
2867 	default:
2868 		log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2869 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2870 			  lkb->lkb_status, lkb->lkb_wait_type,
2871 			  lkb->lkb_resource->res_name);
2872 		break;
2873 	}
2874 
2875 	return rv;
2876 }
2877 
2878 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2879    for success */
2880 
2881 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2882    because there may be a lookup in progress and it's valid to do
2883    cancel/unlockf on it */
2884 
2885 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2886 {
2887 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2888 	int rv = -EBUSY;
2889 
2890 	/* normal unlock not allowed if there's any op in progress */
2891 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2892 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2893 		goto out;
2894 
2895 	/* an lkb may be waiting for an rsb lookup to complete where the
2896 	   lookup was initiated by another lock */
2897 
2898 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2899 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2900 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2901 			list_del_init(&lkb->lkb_rsb_lookup);
2902 			queue_cast(lkb->lkb_resource, lkb,
2903 				   args->flags & DLM_LKF_CANCEL ?
2904 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2905 			unhold_lkb(lkb); /* undoes create_lkb() */
2906 		}
2907 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2908 		goto out;
2909 	}
2910 
2911 	rv = -EINVAL;
2912 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2913 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2914 		dlm_print_lkb(lkb);
2915 		goto out;
2916 	}
2917 
2918 	/* an lkb may still exist even though the lock is EOL'ed due to a
2919 	 * cancel, unlock or failed noqueue request; an app can't use these
2920 	 * locks; return same error as if the lkid had not been found at all
2921 	 */
2922 
2923 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2924 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2925 		rv = -ENOENT;
2926 		goto out;
2927 	}
2928 
2929 	/* cancel not allowed with another cancel/unlock in progress */
2930 
2931 	if (args->flags & DLM_LKF_CANCEL) {
2932 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2933 			goto out;
2934 
2935 		if (is_overlap(lkb))
2936 			goto out;
2937 
2938 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2939 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2940 			rv = -EBUSY;
2941 			goto out;
2942 		}
2943 
2944 		/* there's nothing to cancel */
2945 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2946 		    !lkb->lkb_wait_type) {
2947 			rv = -EBUSY;
2948 			goto out;
2949 		}
2950 
2951 		switch (lkb->lkb_wait_type) {
2952 		case DLM_MSG_LOOKUP:
2953 		case DLM_MSG_REQUEST:
2954 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2955 			rv = -EBUSY;
2956 			goto out;
2957 		case DLM_MSG_UNLOCK:
2958 		case DLM_MSG_CANCEL:
2959 			goto out;
2960 		}
2961 		/* add_to_waiters() will set OVERLAP_CANCEL */
2962 		goto out_ok;
2963 	}
2964 
2965 	/* do we need to allow a force-unlock if there's a normal unlock
2966 	   already in progress?  in what conditions could the normal unlock
2967 	   fail such that we'd want to send a force-unlock to be sure? */
2968 
2969 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2970 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2971 			goto out;
2972 
2973 		if (is_overlap_unlock(lkb))
2974 			goto out;
2975 
2976 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2977 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2978 			rv = -EBUSY;
2979 			goto out;
2980 		}
2981 
2982 		switch (lkb->lkb_wait_type) {
2983 		case DLM_MSG_LOOKUP:
2984 		case DLM_MSG_REQUEST:
2985 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2986 			rv = -EBUSY;
2987 			goto out;
2988 		case DLM_MSG_UNLOCK:
2989 			goto out;
2990 		}
2991 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2992 	}
2993 
2994  out_ok:
2995 	/* an overlapping op shouldn't blow away exflags from other op */
2996 	lkb->lkb_exflags |= args->flags;
2997 	dlm_set_sbflags_val(lkb, 0);
2998 	lkb->lkb_astparam = args->astparam;
2999 	rv = 0;
3000  out:
3001 	switch (rv) {
3002 	case 0:
3003 		break;
3004 	case -EINVAL:
3005 		/* annoy the user because dlm usage is wrong */
3006 		WARN_ON(1);
3007 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3008 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3009 			  args->flags, lkb->lkb_wait_type,
3010 			  lkb->lkb_resource->res_name);
3011 		break;
3012 	default:
3013 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3014 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3015 			  args->flags, lkb->lkb_wait_type,
3016 			  lkb->lkb_resource->res_name);
3017 		break;
3018 	}
3019 
3020 	return rv;
3021 }
3022 
3023 /*
3024  * Four stage 4 varieties:
3025  * do_request(), do_convert(), do_unlock(), do_cancel()
3026  * These are called on the master node for the given lock and
3027  * from the central locking logic.
3028  */
3029 
3030 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3031 {
3032 	int error = 0;
3033 
3034 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3035 		grant_lock(r, lkb);
3036 		queue_cast(r, lkb, 0);
3037 		goto out;
3038 	}
3039 
3040 	if (can_be_queued(lkb)) {
3041 		error = -EINPROGRESS;
3042 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3043 		goto out;
3044 	}
3045 
3046 	error = -EAGAIN;
3047 	queue_cast(r, lkb, -EAGAIN);
3048  out:
3049 	return error;
3050 }
3051 
3052 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3053 			       int error)
3054 {
3055 	switch (error) {
3056 	case -EAGAIN:
3057 		if (force_blocking_asts(lkb))
3058 			send_blocking_asts_all(r, lkb);
3059 		break;
3060 	case -EINPROGRESS:
3061 		send_blocking_asts(r, lkb);
3062 		break;
3063 	}
3064 }
3065 
3066 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067 {
3068 	int error = 0;
3069 	int deadlk = 0;
3070 
3071 	/* changing an existing lock may allow others to be granted */
3072 
3073 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3074 		grant_lock(r, lkb);
3075 		queue_cast(r, lkb, 0);
3076 		goto out;
3077 	}
3078 
3079 	/* can_be_granted() detected that this lock would block in a conversion
3080 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3081 	   the ast for the convert. */
3082 
3083 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3084 		/* it's left on the granted queue */
3085 		revert_lock(r, lkb);
3086 		queue_cast(r, lkb, -EDEADLK);
3087 		error = -EDEADLK;
3088 		goto out;
3089 	}
3090 
3091 	/* is_demoted() means the can_be_granted() above set the grmode
3092 	   to NL, and left us on the granted queue.  This auto-demotion
3093 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3094 	   now grantable.  We have to try to grant other converting locks
3095 	   before we try again to grant this one. */
3096 
3097 	if (is_demoted(lkb)) {
3098 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3099 		if (_can_be_granted(r, lkb, 1, 0)) {
3100 			grant_lock(r, lkb);
3101 			queue_cast(r, lkb, 0);
3102 			goto out;
3103 		}
3104 		/* else fall through and move to convert queue */
3105 	}
3106 
3107 	if (can_be_queued(lkb)) {
3108 		error = -EINPROGRESS;
3109 		del_lkb(r, lkb);
3110 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3111 		goto out;
3112 	}
3113 
3114 	error = -EAGAIN;
3115 	queue_cast(r, lkb, -EAGAIN);
3116  out:
3117 	return error;
3118 }
3119 
3120 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3121 			       int error)
3122 {
3123 	switch (error) {
3124 	case 0:
3125 		grant_pending_locks(r, NULL);
3126 		/* grant_pending_locks also sends basts */
3127 		break;
3128 	case -EAGAIN:
3129 		if (force_blocking_asts(lkb))
3130 			send_blocking_asts_all(r, lkb);
3131 		break;
3132 	case -EINPROGRESS:
3133 		send_blocking_asts(r, lkb);
3134 		break;
3135 	}
3136 }
3137 
3138 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3139 {
3140 	remove_lock(r, lkb);
3141 	queue_cast(r, lkb, -DLM_EUNLOCK);
3142 	return -DLM_EUNLOCK;
3143 }
3144 
3145 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3146 			      int error)
3147 {
3148 	grant_pending_locks(r, NULL);
3149 }
3150 
3151 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3152 
3153 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3154 {
3155 	int error;
3156 
3157 	error = revert_lock(r, lkb);
3158 	if (error) {
3159 		queue_cast(r, lkb, -DLM_ECANCEL);
3160 		return -DLM_ECANCEL;
3161 	}
3162 	return 0;
3163 }
3164 
3165 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3166 			      int error)
3167 {
3168 	if (error)
3169 		grant_pending_locks(r, NULL);
3170 }
3171 
3172 /*
3173  * Four stage 3 varieties:
3174  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3175  */
3176 
3177 /* add a new lkb to a possibly new rsb, called by requesting process */
3178 
3179 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3180 {
3181 	int error;
3182 
3183 	/* set_master: sets lkb nodeid from r */
3184 
3185 	error = set_master(r, lkb);
3186 	if (error < 0)
3187 		goto out;
3188 	if (error) {
3189 		error = 0;
3190 		goto out;
3191 	}
3192 
3193 	if (is_remote(r)) {
3194 		/* receive_request() calls do_request() on remote node */
3195 		error = send_request(r, lkb);
3196 	} else {
3197 		error = do_request(r, lkb);
3198 		/* for remote locks the request_reply is sent
3199 		   between do_request and do_request_effects */
3200 		do_request_effects(r, lkb, error);
3201 	}
3202  out:
3203 	return error;
3204 }
3205 
3206 /* change some property of an existing lkb, e.g. mode */
3207 
3208 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3209 {
3210 	int error;
3211 
3212 	if (is_remote(r)) {
3213 		/* receive_convert() calls do_convert() on remote node */
3214 		error = send_convert(r, lkb);
3215 	} else {
3216 		error = do_convert(r, lkb);
3217 		/* for remote locks the convert_reply is sent
3218 		   between do_convert and do_convert_effects */
3219 		do_convert_effects(r, lkb, error);
3220 	}
3221 
3222 	return error;
3223 }
3224 
3225 /* remove an existing lkb from the granted queue */
3226 
3227 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3228 {
3229 	int error;
3230 
3231 	if (is_remote(r)) {
3232 		/* receive_unlock() calls do_unlock() on remote node */
3233 		error = send_unlock(r, lkb);
3234 	} else {
3235 		error = do_unlock(r, lkb);
3236 		/* for remote locks the unlock_reply is sent
3237 		   between do_unlock and do_unlock_effects */
3238 		do_unlock_effects(r, lkb, error);
3239 	}
3240 
3241 	return error;
3242 }
3243 
3244 /* remove an existing lkb from the convert or wait queue */
3245 
3246 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3247 {
3248 	int error;
3249 
3250 	if (is_remote(r)) {
3251 		/* receive_cancel() calls do_cancel() on remote node */
3252 		error = send_cancel(r, lkb);
3253 	} else {
3254 		error = do_cancel(r, lkb);
3255 		/* for remote locks the cancel_reply is sent
3256 		   between do_cancel and do_cancel_effects */
3257 		do_cancel_effects(r, lkb, error);
3258 	}
3259 
3260 	return error;
3261 }
3262 
3263 /*
3264  * Four stage 2 varieties:
3265  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3266  */
3267 
3268 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3269 			const void *name, int len,
3270 			struct dlm_args *args)
3271 {
3272 	struct dlm_rsb *r;
3273 	int error;
3274 
3275 	error = validate_lock_args(ls, lkb, args);
3276 	if (error)
3277 		return error;
3278 
3279 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3280 	if (error)
3281 		return error;
3282 
3283 	lock_rsb(r);
3284 
3285 	attach_lkb(r, lkb);
3286 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3287 
3288 	error = _request_lock(r, lkb);
3289 
3290 	unlock_rsb(r);
3291 	put_rsb(r);
3292 	return error;
3293 }
3294 
3295 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3296 			struct dlm_args *args)
3297 {
3298 	struct dlm_rsb *r;
3299 	int error;
3300 
3301 	r = lkb->lkb_resource;
3302 
3303 	hold_rsb(r);
3304 	lock_rsb(r);
3305 
3306 	error = validate_lock_args(ls, lkb, args);
3307 	if (error)
3308 		goto out;
3309 
3310 	error = _convert_lock(r, lkb);
3311  out:
3312 	unlock_rsb(r);
3313 	put_rsb(r);
3314 	return error;
3315 }
3316 
3317 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3318 		       struct dlm_args *args)
3319 {
3320 	struct dlm_rsb *r;
3321 	int error;
3322 
3323 	r = lkb->lkb_resource;
3324 
3325 	hold_rsb(r);
3326 	lock_rsb(r);
3327 
3328 	error = validate_unlock_args(lkb, args);
3329 	if (error)
3330 		goto out;
3331 
3332 	error = _unlock_lock(r, lkb);
3333  out:
3334 	unlock_rsb(r);
3335 	put_rsb(r);
3336 	return error;
3337 }
3338 
3339 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3340 		       struct dlm_args *args)
3341 {
3342 	struct dlm_rsb *r;
3343 	int error;
3344 
3345 	r = lkb->lkb_resource;
3346 
3347 	hold_rsb(r);
3348 	lock_rsb(r);
3349 
3350 	error = validate_unlock_args(lkb, args);
3351 	if (error)
3352 		goto out;
3353 
3354 	error = _cancel_lock(r, lkb);
3355  out:
3356 	unlock_rsb(r);
3357 	put_rsb(r);
3358 	return error;
3359 }
3360 
3361 /*
3362  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3363  */
3364 
3365 int dlm_lock(dlm_lockspace_t *lockspace,
3366 	     int mode,
3367 	     struct dlm_lksb *lksb,
3368 	     uint32_t flags,
3369 	     const void *name,
3370 	     unsigned int namelen,
3371 	     uint32_t parent_lkid,
3372 	     void (*ast) (void *astarg),
3373 	     void *astarg,
3374 	     void (*bast) (void *astarg, int mode))
3375 {
3376 	struct dlm_ls *ls;
3377 	struct dlm_lkb *lkb;
3378 	struct dlm_args args;
3379 	int error, convert = flags & DLM_LKF_CONVERT;
3380 
3381 	ls = dlm_find_lockspace_local(lockspace);
3382 	if (!ls)
3383 		return -EINVAL;
3384 
3385 	dlm_lock_recovery(ls);
3386 
3387 	if (convert)
3388 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3389 	else
3390 		error = create_lkb(ls, &lkb);
3391 
3392 	if (error)
3393 		goto out;
3394 
3395 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3396 
3397 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3398 			      &args);
3399 	if (error)
3400 		goto out_put;
3401 
3402 	if (convert)
3403 		error = convert_lock(ls, lkb, &args);
3404 	else
3405 		error = request_lock(ls, lkb, name, namelen, &args);
3406 
3407 	if (error == -EINPROGRESS)
3408 		error = 0;
3409  out_put:
3410 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3411 
3412 	if (convert || error)
3413 		__put_lkb(ls, lkb);
3414 	if (error == -EAGAIN || error == -EDEADLK)
3415 		error = 0;
3416  out:
3417 	dlm_unlock_recovery(ls);
3418 	dlm_put_lockspace(ls);
3419 	return error;
3420 }
3421 
3422 int dlm_unlock(dlm_lockspace_t *lockspace,
3423 	       uint32_t lkid,
3424 	       uint32_t flags,
3425 	       struct dlm_lksb *lksb,
3426 	       void *astarg)
3427 {
3428 	struct dlm_ls *ls;
3429 	struct dlm_lkb *lkb;
3430 	struct dlm_args args;
3431 	int error;
3432 
3433 	ls = dlm_find_lockspace_local(lockspace);
3434 	if (!ls)
3435 		return -EINVAL;
3436 
3437 	dlm_lock_recovery(ls);
3438 
3439 	error = find_lkb(ls, lkid, &lkb);
3440 	if (error)
3441 		goto out;
3442 
3443 	trace_dlm_unlock_start(ls, lkb, flags);
3444 
3445 	error = set_unlock_args(flags, astarg, &args);
3446 	if (error)
3447 		goto out_put;
3448 
3449 	if (flags & DLM_LKF_CANCEL)
3450 		error = cancel_lock(ls, lkb, &args);
3451 	else
3452 		error = unlock_lock(ls, lkb, &args);
3453 
3454 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3455 		error = 0;
3456 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3457 		error = 0;
3458  out_put:
3459 	trace_dlm_unlock_end(ls, lkb, flags, error);
3460 
3461 	dlm_put_lkb(lkb);
3462  out:
3463 	dlm_unlock_recovery(ls);
3464 	dlm_put_lockspace(ls);
3465 	return error;
3466 }
3467 
3468 /*
3469  * send/receive routines for remote operations and replies
3470  *
3471  * send_args
3472  * send_common
3473  * send_request			receive_request
3474  * send_convert			receive_convert
3475  * send_unlock			receive_unlock
3476  * send_cancel			receive_cancel
3477  * send_grant			receive_grant
3478  * send_bast			receive_bast
3479  * send_lookup			receive_lookup
3480  * send_remove			receive_remove
3481  *
3482  * 				send_common_reply
3483  * receive_request_reply	send_request_reply
3484  * receive_convert_reply	send_convert_reply
3485  * receive_unlock_reply		send_unlock_reply
3486  * receive_cancel_reply		send_cancel_reply
3487  * receive_lookup_reply		send_lookup_reply
3488  */
3489 
3490 static int _create_message(struct dlm_ls *ls, int mb_len,
3491 			   int to_nodeid, int mstype,
3492 			   struct dlm_message **ms_ret,
3493 			   struct dlm_mhandle **mh_ret)
3494 {
3495 	struct dlm_message *ms;
3496 	struct dlm_mhandle *mh;
3497 	char *mb;
3498 
3499 	/* get_buffer gives us a message handle (mh) that we need to
3500 	   pass into midcomms_commit and a message buffer (mb) that we
3501 	   write our data into */
3502 
3503 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3504 	if (!mh)
3505 		return -ENOBUFS;
3506 
3507 	ms = (struct dlm_message *) mb;
3508 
3509 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3510 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3511 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3512 	ms->m_header.h_length = cpu_to_le16(mb_len);
3513 	ms->m_header.h_cmd = DLM_MSG;
3514 
3515 	ms->m_type = cpu_to_le32(mstype);
3516 
3517 	*mh_ret = mh;
3518 	*ms_ret = ms;
3519 	return 0;
3520 }
3521 
3522 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3523 			  int to_nodeid, int mstype,
3524 			  struct dlm_message **ms_ret,
3525 			  struct dlm_mhandle **mh_ret)
3526 {
3527 	int mb_len = sizeof(struct dlm_message);
3528 
3529 	switch (mstype) {
3530 	case DLM_MSG_REQUEST:
3531 	case DLM_MSG_LOOKUP:
3532 	case DLM_MSG_REMOVE:
3533 		mb_len += r->res_length;
3534 		break;
3535 	case DLM_MSG_CONVERT:
3536 	case DLM_MSG_UNLOCK:
3537 	case DLM_MSG_REQUEST_REPLY:
3538 	case DLM_MSG_CONVERT_REPLY:
3539 	case DLM_MSG_GRANT:
3540 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3541 			mb_len += r->res_ls->ls_lvblen;
3542 		break;
3543 	}
3544 
3545 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3546 			       ms_ret, mh_ret);
3547 }
3548 
3549 /* further lowcomms enhancements or alternate implementations may make
3550    the return value from this function useful at some point */
3551 
3552 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3553 			const void *name, int namelen)
3554 {
3555 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3556 	return 0;
3557 }
3558 
3559 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3560 		      struct dlm_message *ms)
3561 {
3562 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3563 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3564 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3565 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3566 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3567 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3568 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3569 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3570 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3571 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3572 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3573 	ms->m_hash     = cpu_to_le32(r->res_hash);
3574 
3575 	/* m_result and m_bastmode are set from function args,
3576 	   not from lkb fields */
3577 
3578 	if (lkb->lkb_bastfn)
3579 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3580 	if (lkb->lkb_astfn)
3581 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3582 
3583 	/* compare with switch in create_message; send_remove() doesn't
3584 	   use send_args() */
3585 
3586 	switch (ms->m_type) {
3587 	case cpu_to_le32(DLM_MSG_REQUEST):
3588 	case cpu_to_le32(DLM_MSG_LOOKUP):
3589 		memcpy(ms->m_extra, r->res_name, r->res_length);
3590 		break;
3591 	case cpu_to_le32(DLM_MSG_CONVERT):
3592 	case cpu_to_le32(DLM_MSG_UNLOCK):
3593 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3594 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3595 	case cpu_to_le32(DLM_MSG_GRANT):
3596 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3597 			break;
3598 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3599 		break;
3600 	}
3601 }
3602 
3603 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3604 {
3605 	struct dlm_message *ms;
3606 	struct dlm_mhandle *mh;
3607 	int to_nodeid, error;
3608 
3609 	to_nodeid = r->res_nodeid;
3610 
3611 	error = add_to_waiters(lkb, mstype, to_nodeid);
3612 	if (error)
3613 		return error;
3614 
3615 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3616 	if (error)
3617 		goto fail;
3618 
3619 	send_args(r, lkb, ms);
3620 
3621 	error = send_message(mh, ms, r->res_name, r->res_length);
3622 	if (error)
3623 		goto fail;
3624 	return 0;
3625 
3626  fail:
3627 	remove_from_waiters(lkb, msg_reply_type(mstype));
3628 	return error;
3629 }
3630 
3631 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3632 {
3633 	return send_common(r, lkb, DLM_MSG_REQUEST);
3634 }
3635 
3636 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3637 {
3638 	int error;
3639 
3640 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3641 
3642 	/* down conversions go without a reply from the master */
3643 	if (!error && down_conversion(lkb)) {
3644 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3645 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3646 		r->res_ls->ls_local_ms.m_result = 0;
3647 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3648 	}
3649 
3650 	return error;
3651 }
3652 
3653 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3654    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3655    that the master is still correct. */
3656 
3657 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3658 {
3659 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3660 }
3661 
3662 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3663 {
3664 	return send_common(r, lkb, DLM_MSG_CANCEL);
3665 }
3666 
3667 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3668 {
3669 	struct dlm_message *ms;
3670 	struct dlm_mhandle *mh;
3671 	int to_nodeid, error;
3672 
3673 	to_nodeid = lkb->lkb_nodeid;
3674 
3675 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3676 	if (error)
3677 		goto out;
3678 
3679 	send_args(r, lkb, ms);
3680 
3681 	ms->m_result = 0;
3682 
3683 	error = send_message(mh, ms, r->res_name, r->res_length);
3684  out:
3685 	return error;
3686 }
3687 
3688 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3689 {
3690 	struct dlm_message *ms;
3691 	struct dlm_mhandle *mh;
3692 	int to_nodeid, error;
3693 
3694 	to_nodeid = lkb->lkb_nodeid;
3695 
3696 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3697 	if (error)
3698 		goto out;
3699 
3700 	send_args(r, lkb, ms);
3701 
3702 	ms->m_bastmode = cpu_to_le32(mode);
3703 
3704 	error = send_message(mh, ms, r->res_name, r->res_length);
3705  out:
3706 	return error;
3707 }
3708 
3709 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3710 {
3711 	struct dlm_message *ms;
3712 	struct dlm_mhandle *mh;
3713 	int to_nodeid, error;
3714 
3715 	to_nodeid = dlm_dir_nodeid(r);
3716 
3717 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3718 	if (error)
3719 		return error;
3720 
3721 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3722 	if (error)
3723 		goto fail;
3724 
3725 	send_args(r, lkb, ms);
3726 
3727 	error = send_message(mh, ms, r->res_name, r->res_length);
3728 	if (error)
3729 		goto fail;
3730 	return 0;
3731 
3732  fail:
3733 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3734 	return error;
3735 }
3736 
3737 static int send_remove(struct dlm_rsb *r)
3738 {
3739 	struct dlm_message *ms;
3740 	struct dlm_mhandle *mh;
3741 	int to_nodeid, error;
3742 
3743 	to_nodeid = dlm_dir_nodeid(r);
3744 
3745 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3746 	if (error)
3747 		goto out;
3748 
3749 	memcpy(ms->m_extra, r->res_name, r->res_length);
3750 	ms->m_hash = cpu_to_le32(r->res_hash);
3751 
3752 	error = send_message(mh, ms, r->res_name, r->res_length);
3753  out:
3754 	return error;
3755 }
3756 
3757 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3758 			     int mstype, int rv)
3759 {
3760 	struct dlm_message *ms;
3761 	struct dlm_mhandle *mh;
3762 	int to_nodeid, error;
3763 
3764 	to_nodeid = lkb->lkb_nodeid;
3765 
3766 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3767 	if (error)
3768 		goto out;
3769 
3770 	send_args(r, lkb, ms);
3771 
3772 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3773 
3774 	error = send_message(mh, ms, r->res_name, r->res_length);
3775  out:
3776 	return error;
3777 }
3778 
3779 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3780 {
3781 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3782 }
3783 
3784 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3785 {
3786 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3787 }
3788 
3789 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3790 {
3791 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3792 }
3793 
3794 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3795 {
3796 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3797 }
3798 
3799 static int send_lookup_reply(struct dlm_ls *ls,
3800 			     const struct dlm_message *ms_in, int ret_nodeid,
3801 			     int rv)
3802 {
3803 	struct dlm_rsb *r = &ls->ls_local_rsb;
3804 	struct dlm_message *ms;
3805 	struct dlm_mhandle *mh;
3806 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3807 
3808 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3809 	if (error)
3810 		goto out;
3811 
3812 	ms->m_lkid = ms_in->m_lkid;
3813 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3814 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3815 
3816 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3817  out:
3818 	return error;
3819 }
3820 
3821 /* which args we save from a received message depends heavily on the type
3822    of message, unlike the send side where we can safely send everything about
3823    the lkb for any type of message */
3824 
3825 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3826 {
3827 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3828 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3829 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3830 }
3831 
3832 static void receive_flags_reply(struct dlm_lkb *lkb,
3833 				const struct dlm_message *ms,
3834 				bool local)
3835 {
3836 	if (local)
3837 		return;
3838 
3839 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3840 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3841 }
3842 
3843 static int receive_extralen(const struct dlm_message *ms)
3844 {
3845 	return (le16_to_cpu(ms->m_header.h_length) -
3846 		sizeof(struct dlm_message));
3847 }
3848 
3849 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3850 		       const struct dlm_message *ms)
3851 {
3852 	int len;
3853 
3854 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3855 		if (!lkb->lkb_lvbptr)
3856 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3857 		if (!lkb->lkb_lvbptr)
3858 			return -ENOMEM;
3859 		len = receive_extralen(ms);
3860 		if (len > ls->ls_lvblen)
3861 			len = ls->ls_lvblen;
3862 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3863 	}
3864 	return 0;
3865 }
3866 
3867 static void fake_bastfn(void *astparam, int mode)
3868 {
3869 	log_print("fake_bastfn should not be called");
3870 }
3871 
3872 static void fake_astfn(void *astparam)
3873 {
3874 	log_print("fake_astfn should not be called");
3875 }
3876 
3877 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3878 				const struct dlm_message *ms)
3879 {
3880 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3881 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3882 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3883 	lkb->lkb_grmode = DLM_LOCK_IV;
3884 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3885 
3886 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3887 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3888 
3889 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3890 		/* lkb was just created so there won't be an lvb yet */
3891 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3892 		if (!lkb->lkb_lvbptr)
3893 			return -ENOMEM;
3894 	}
3895 
3896 	return 0;
3897 }
3898 
3899 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3900 				const struct dlm_message *ms)
3901 {
3902 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3903 		return -EBUSY;
3904 
3905 	if (receive_lvb(ls, lkb, ms))
3906 		return -ENOMEM;
3907 
3908 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3909 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3910 
3911 	return 0;
3912 }
3913 
3914 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3915 			       const struct dlm_message *ms)
3916 {
3917 	if (receive_lvb(ls, lkb, ms))
3918 		return -ENOMEM;
3919 	return 0;
3920 }
3921 
3922 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3923    uses to send a reply and that the remote end uses to process the reply. */
3924 
3925 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3926 {
3927 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3928 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3929 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3930 }
3931 
3932 /* This is called after the rsb is locked so that we can safely inspect
3933    fields in the lkb. */
3934 
3935 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3936 {
3937 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3938 	int error = 0;
3939 
3940 	/* currently mixing of user/kernel locks are not supported */
3941 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3942 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3943 		log_error(lkb->lkb_resource->res_ls,
3944 			  "got user dlm message for a kernel lock");
3945 		error = -EINVAL;
3946 		goto out;
3947 	}
3948 
3949 	switch (ms->m_type) {
3950 	case cpu_to_le32(DLM_MSG_CONVERT):
3951 	case cpu_to_le32(DLM_MSG_UNLOCK):
3952 	case cpu_to_le32(DLM_MSG_CANCEL):
3953 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3954 			error = -EINVAL;
3955 		break;
3956 
3957 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3958 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3959 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3960 	case cpu_to_le32(DLM_MSG_GRANT):
3961 	case cpu_to_le32(DLM_MSG_BAST):
3962 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3963 			error = -EINVAL;
3964 		break;
3965 
3966 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3967 		if (!is_process_copy(lkb))
3968 			error = -EINVAL;
3969 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3970 			error = -EINVAL;
3971 		break;
3972 
3973 	default:
3974 		error = -EINVAL;
3975 	}
3976 
3977 out:
3978 	if (error)
3979 		log_error(lkb->lkb_resource->res_ls,
3980 			  "ignore invalid message %d from %d %x %x %x %d",
3981 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3982 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3983 			  lkb->lkb_nodeid);
3984 	return error;
3985 }
3986 
3987 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3988 {
3989 	struct dlm_lkb *lkb;
3990 	struct dlm_rsb *r;
3991 	int from_nodeid;
3992 	int error, namelen = 0;
3993 
3994 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3995 
3996 	error = create_lkb(ls, &lkb);
3997 	if (error)
3998 		goto fail;
3999 
4000 	receive_flags(lkb, ms);
4001 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4002 	error = receive_request_args(ls, lkb, ms);
4003 	if (error) {
4004 		__put_lkb(ls, lkb);
4005 		goto fail;
4006 	}
4007 
4008 	/* The dir node is the authority on whether we are the master
4009 	   for this rsb or not, so if the master sends us a request, we should
4010 	   recreate the rsb if we've destroyed it.   This race happens when we
4011 	   send a remove message to the dir node at the same time that the dir
4012 	   node sends us a request for the rsb. */
4013 
4014 	namelen = receive_extralen(ms);
4015 
4016 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4017 			 R_RECEIVE_REQUEST, &r);
4018 	if (error) {
4019 		__put_lkb(ls, lkb);
4020 		goto fail;
4021 	}
4022 
4023 	lock_rsb(r);
4024 
4025 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4026 		error = validate_master_nodeid(ls, r, from_nodeid);
4027 		if (error) {
4028 			unlock_rsb(r);
4029 			put_rsb(r);
4030 			__put_lkb(ls, lkb);
4031 			goto fail;
4032 		}
4033 	}
4034 
4035 	attach_lkb(r, lkb);
4036 	error = do_request(r, lkb);
4037 	send_request_reply(r, lkb, error);
4038 	do_request_effects(r, lkb, error);
4039 
4040 	unlock_rsb(r);
4041 	put_rsb(r);
4042 
4043 	if (error == -EINPROGRESS)
4044 		error = 0;
4045 	if (error)
4046 		dlm_put_lkb(lkb);
4047 	return 0;
4048 
4049  fail:
4050 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4051 	   and do this receive_request again from process_lookup_list once
4052 	   we get the lookup reply.  This would avoid a many repeated
4053 	   ENOTBLK request failures when the lookup reply designating us
4054 	   as master is delayed. */
4055 
4056 	if (error != -ENOTBLK) {
4057 		log_limit(ls, "receive_request %x from %d %d",
4058 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4059 	}
4060 
4061 	setup_local_lkb(ls, ms);
4062 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4063 	return error;
4064 }
4065 
4066 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4067 {
4068 	struct dlm_lkb *lkb;
4069 	struct dlm_rsb *r;
4070 	int error, reply = 1;
4071 
4072 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4073 	if (error)
4074 		goto fail;
4075 
4076 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4077 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4078 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4079 			  (unsigned long long)lkb->lkb_recover_seq,
4080 			  le32_to_cpu(ms->m_header.h_nodeid),
4081 			  le32_to_cpu(ms->m_lkid));
4082 		error = -ENOENT;
4083 		dlm_put_lkb(lkb);
4084 		goto fail;
4085 	}
4086 
4087 	r = lkb->lkb_resource;
4088 
4089 	hold_rsb(r);
4090 	lock_rsb(r);
4091 
4092 	error = validate_message(lkb, ms);
4093 	if (error)
4094 		goto out;
4095 
4096 	receive_flags(lkb, ms);
4097 
4098 	error = receive_convert_args(ls, lkb, ms);
4099 	if (error) {
4100 		send_convert_reply(r, lkb, error);
4101 		goto out;
4102 	}
4103 
4104 	reply = !down_conversion(lkb);
4105 
4106 	error = do_convert(r, lkb);
4107 	if (reply)
4108 		send_convert_reply(r, lkb, error);
4109 	do_convert_effects(r, lkb, error);
4110  out:
4111 	unlock_rsb(r);
4112 	put_rsb(r);
4113 	dlm_put_lkb(lkb);
4114 	return 0;
4115 
4116  fail:
4117 	setup_local_lkb(ls, ms);
4118 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4119 	return error;
4120 }
4121 
4122 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4123 {
4124 	struct dlm_lkb *lkb;
4125 	struct dlm_rsb *r;
4126 	int error;
4127 
4128 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4129 	if (error)
4130 		goto fail;
4131 
4132 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4133 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4134 			  lkb->lkb_id, lkb->lkb_remid,
4135 			  le32_to_cpu(ms->m_header.h_nodeid),
4136 			  le32_to_cpu(ms->m_lkid));
4137 		error = -ENOENT;
4138 		dlm_put_lkb(lkb);
4139 		goto fail;
4140 	}
4141 
4142 	r = lkb->lkb_resource;
4143 
4144 	hold_rsb(r);
4145 	lock_rsb(r);
4146 
4147 	error = validate_message(lkb, ms);
4148 	if (error)
4149 		goto out;
4150 
4151 	receive_flags(lkb, ms);
4152 
4153 	error = receive_unlock_args(ls, lkb, ms);
4154 	if (error) {
4155 		send_unlock_reply(r, lkb, error);
4156 		goto out;
4157 	}
4158 
4159 	error = do_unlock(r, lkb);
4160 	send_unlock_reply(r, lkb, error);
4161 	do_unlock_effects(r, lkb, error);
4162  out:
4163 	unlock_rsb(r);
4164 	put_rsb(r);
4165 	dlm_put_lkb(lkb);
4166 	return 0;
4167 
4168  fail:
4169 	setup_local_lkb(ls, ms);
4170 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4171 	return error;
4172 }
4173 
4174 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4175 {
4176 	struct dlm_lkb *lkb;
4177 	struct dlm_rsb *r;
4178 	int error;
4179 
4180 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4181 	if (error)
4182 		goto fail;
4183 
4184 	receive_flags(lkb, ms);
4185 
4186 	r = lkb->lkb_resource;
4187 
4188 	hold_rsb(r);
4189 	lock_rsb(r);
4190 
4191 	error = validate_message(lkb, ms);
4192 	if (error)
4193 		goto out;
4194 
4195 	error = do_cancel(r, lkb);
4196 	send_cancel_reply(r, lkb, error);
4197 	do_cancel_effects(r, lkb, error);
4198  out:
4199 	unlock_rsb(r);
4200 	put_rsb(r);
4201 	dlm_put_lkb(lkb);
4202 	return 0;
4203 
4204  fail:
4205 	setup_local_lkb(ls, ms);
4206 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4207 	return error;
4208 }
4209 
4210 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4211 {
4212 	struct dlm_lkb *lkb;
4213 	struct dlm_rsb *r;
4214 	int error;
4215 
4216 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4217 	if (error)
4218 		return error;
4219 
4220 	r = lkb->lkb_resource;
4221 
4222 	hold_rsb(r);
4223 	lock_rsb(r);
4224 
4225 	error = validate_message(lkb, ms);
4226 	if (error)
4227 		goto out;
4228 
4229 	receive_flags_reply(lkb, ms, false);
4230 	if (is_altmode(lkb))
4231 		munge_altmode(lkb, ms);
4232 	grant_lock_pc(r, lkb, ms);
4233 	queue_cast(r, lkb, 0);
4234  out:
4235 	unlock_rsb(r);
4236 	put_rsb(r);
4237 	dlm_put_lkb(lkb);
4238 	return 0;
4239 }
4240 
4241 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4242 {
4243 	struct dlm_lkb *lkb;
4244 	struct dlm_rsb *r;
4245 	int error;
4246 
4247 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4248 	if (error)
4249 		return error;
4250 
4251 	r = lkb->lkb_resource;
4252 
4253 	hold_rsb(r);
4254 	lock_rsb(r);
4255 
4256 	error = validate_message(lkb, ms);
4257 	if (error)
4258 		goto out;
4259 
4260 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4261 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4262  out:
4263 	unlock_rsb(r);
4264 	put_rsb(r);
4265 	dlm_put_lkb(lkb);
4266 	return 0;
4267 }
4268 
4269 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4270 {
4271 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4272 
4273 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4274 	our_nodeid = dlm_our_nodeid();
4275 
4276 	len = receive_extralen(ms);
4277 
4278 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4279 				  &ret_nodeid, NULL);
4280 
4281 	/* Optimization: we're master so treat lookup as a request */
4282 	if (!error && ret_nodeid == our_nodeid) {
4283 		receive_request(ls, ms);
4284 		return;
4285 	}
4286 	send_lookup_reply(ls, ms, ret_nodeid, error);
4287 }
4288 
4289 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4290 {
4291 	char name[DLM_RESNAME_MAXLEN+1];
4292 	struct dlm_rsb *r;
4293 	int rv, len, dir_nodeid, from_nodeid;
4294 
4295 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4296 
4297 	len = receive_extralen(ms);
4298 
4299 	if (len > DLM_RESNAME_MAXLEN) {
4300 		log_error(ls, "receive_remove from %d bad len %d",
4301 			  from_nodeid, len);
4302 		return;
4303 	}
4304 
4305 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4306 	if (dir_nodeid != dlm_our_nodeid()) {
4307 		log_error(ls, "receive_remove from %d bad nodeid %d",
4308 			  from_nodeid, dir_nodeid);
4309 		return;
4310 	}
4311 
4312 	/*
4313 	 * Look for inactive rsb, if it's there, free it.
4314 	 * If the rsb is active, it's being used, and we should ignore this
4315 	 * message.  This is an expected race between the dir node sending a
4316 	 * request to the master node at the same time as the master node sends
4317 	 * a remove to the dir node.  The resolution to that race is for the
4318 	 * dir node to ignore the remove message, and the master node to
4319 	 * recreate the master rsb when it gets a request from the dir node for
4320 	 * an rsb it doesn't have.
4321 	 */
4322 
4323 	memset(name, 0, sizeof(name));
4324 	memcpy(name, ms->m_extra, len);
4325 
4326 	write_lock_bh(&ls->ls_rsbtbl_lock);
4327 
4328 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329 	if (rv) {
4330 		/* should not happen */
4331 		log_error(ls, "%s from %d not found %s", __func__,
4332 			  from_nodeid, name);
4333 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4334 		return;
4335 	}
4336 
4337 	if (!rsb_flag(r, RSB_INACTIVE)) {
4338 		if (r->res_master_nodeid != from_nodeid) {
4339 			/* should not happen */
4340 			log_error(ls, "receive_remove on active rsb from %d master %d",
4341 				  from_nodeid, r->res_master_nodeid);
4342 			dlm_print_rsb(r);
4343 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4344 			return;
4345 		}
4346 
4347 		/* Ignore the remove message, see race comment above. */
4348 
4349 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4350 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4351 			  name);
4352 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4353 		return;
4354 	}
4355 
4356 	if (r->res_master_nodeid != from_nodeid) {
4357 		log_error(ls, "receive_remove inactive from %d master %d",
4358 			  from_nodeid, r->res_master_nodeid);
4359 		dlm_print_rsb(r);
4360 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4361 		return;
4362 	}
4363 
4364 	list_del(&r->res_slow_list);
4365 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4366 			       dlm_rhash_rsb_params);
4367 	rsb_clear_flag(r, RSB_HASHED);
4368 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4369 
4370 	free_inactive_rsb(r);
4371 }
4372 
4373 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4374 {
4375 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4376 }
4377 
4378 static int receive_request_reply(struct dlm_ls *ls,
4379 				 const struct dlm_message *ms)
4380 {
4381 	struct dlm_lkb *lkb;
4382 	struct dlm_rsb *r;
4383 	int error, mstype, result;
4384 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4385 
4386 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4387 	if (error)
4388 		return error;
4389 
4390 	r = lkb->lkb_resource;
4391 	hold_rsb(r);
4392 	lock_rsb(r);
4393 
4394 	error = validate_message(lkb, ms);
4395 	if (error)
4396 		goto out;
4397 
4398 	mstype = lkb->lkb_wait_type;
4399 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4400 	if (error) {
4401 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4402 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4403 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4404 		dlm_dump_rsb(r);
4405 		goto out;
4406 	}
4407 
4408 	/* Optimization: the dir node was also the master, so it took our
4409 	   lookup as a request and sent request reply instead of lookup reply */
4410 	if (mstype == DLM_MSG_LOOKUP) {
4411 		r->res_master_nodeid = from_nodeid;
4412 		r->res_nodeid = from_nodeid;
4413 		lkb->lkb_nodeid = from_nodeid;
4414 	}
4415 
4416 	/* this is the value returned from do_request() on the master */
4417 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4418 
4419 	switch (result) {
4420 	case -EAGAIN:
4421 		/* request would block (be queued) on remote master */
4422 		queue_cast(r, lkb, -EAGAIN);
4423 		confirm_master(r, -EAGAIN);
4424 		unhold_lkb(lkb); /* undoes create_lkb() */
4425 		break;
4426 
4427 	case -EINPROGRESS:
4428 	case 0:
4429 		/* request was queued or granted on remote master */
4430 		receive_flags_reply(lkb, ms, false);
4431 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4432 		if (is_altmode(lkb))
4433 			munge_altmode(lkb, ms);
4434 		if (result) {
4435 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4436 		} else {
4437 			grant_lock_pc(r, lkb, ms);
4438 			queue_cast(r, lkb, 0);
4439 		}
4440 		confirm_master(r, result);
4441 		break;
4442 
4443 	case -EBADR:
4444 	case -ENOTBLK:
4445 		/* find_rsb failed to find rsb or rsb wasn't master */
4446 		log_limit(ls, "receive_request_reply %x from %d %d "
4447 			  "master %d dir %d first %x %s", lkb->lkb_id,
4448 			  from_nodeid, result, r->res_master_nodeid,
4449 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4450 
4451 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4452 		    r->res_master_nodeid != dlm_our_nodeid()) {
4453 			/* cause _request_lock->set_master->send_lookup */
4454 			r->res_master_nodeid = 0;
4455 			r->res_nodeid = -1;
4456 			lkb->lkb_nodeid = -1;
4457 		}
4458 
4459 		if (is_overlap(lkb)) {
4460 			/* we'll ignore error in cancel/unlock reply */
4461 			queue_cast_overlap(r, lkb);
4462 			confirm_master(r, result);
4463 			unhold_lkb(lkb); /* undoes create_lkb() */
4464 		} else {
4465 			_request_lock(r, lkb);
4466 
4467 			if (r->res_master_nodeid == dlm_our_nodeid())
4468 				confirm_master(r, 0);
4469 		}
4470 		break;
4471 
4472 	default:
4473 		log_error(ls, "receive_request_reply %x error %d",
4474 			  lkb->lkb_id, result);
4475 	}
4476 
4477 	if ((result == 0 || result == -EINPROGRESS) &&
4478 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4479 		log_debug(ls, "receive_request_reply %x result %d unlock",
4480 			  lkb->lkb_id, result);
4481 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4482 		send_unlock(r, lkb);
4483 	} else if ((result == -EINPROGRESS) &&
4484 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4485 				      &lkb->lkb_iflags)) {
4486 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4487 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4488 		send_cancel(r, lkb);
4489 	} else {
4490 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4491 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4492 	}
4493  out:
4494 	unlock_rsb(r);
4495 	put_rsb(r);
4496 	dlm_put_lkb(lkb);
4497 	return 0;
4498 }
4499 
4500 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4501 				    const struct dlm_message *ms, bool local)
4502 {
4503 	/* this is the value returned from do_convert() on the master */
4504 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4505 	case -EAGAIN:
4506 		/* convert would block (be queued) on remote master */
4507 		queue_cast(r, lkb, -EAGAIN);
4508 		break;
4509 
4510 	case -EDEADLK:
4511 		receive_flags_reply(lkb, ms, local);
4512 		revert_lock_pc(r, lkb);
4513 		queue_cast(r, lkb, -EDEADLK);
4514 		break;
4515 
4516 	case -EINPROGRESS:
4517 		/* convert was queued on remote master */
4518 		receive_flags_reply(lkb, ms, local);
4519 		if (is_demoted(lkb))
4520 			munge_demoted(lkb);
4521 		del_lkb(r, lkb);
4522 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4523 		break;
4524 
4525 	case 0:
4526 		/* convert was granted on remote master */
4527 		receive_flags_reply(lkb, ms, local);
4528 		if (is_demoted(lkb))
4529 			munge_demoted(lkb);
4530 		grant_lock_pc(r, lkb, ms);
4531 		queue_cast(r, lkb, 0);
4532 		break;
4533 
4534 	default:
4535 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4536 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4537 			  le32_to_cpu(ms->m_lkid),
4538 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4539 		dlm_print_rsb(r);
4540 		dlm_print_lkb(lkb);
4541 	}
4542 }
4543 
4544 static void _receive_convert_reply(struct dlm_lkb *lkb,
4545 				   const struct dlm_message *ms, bool local)
4546 {
4547 	struct dlm_rsb *r = lkb->lkb_resource;
4548 	int error;
4549 
4550 	hold_rsb(r);
4551 	lock_rsb(r);
4552 
4553 	error = validate_message(lkb, ms);
4554 	if (error)
4555 		goto out;
4556 
4557 	error = remove_from_waiters_ms(lkb, ms, local);
4558 	if (error)
4559 		goto out;
4560 
4561 	__receive_convert_reply(r, lkb, ms, local);
4562  out:
4563 	unlock_rsb(r);
4564 	put_rsb(r);
4565 }
4566 
4567 static int receive_convert_reply(struct dlm_ls *ls,
4568 				 const struct dlm_message *ms)
4569 {
4570 	struct dlm_lkb *lkb;
4571 	int error;
4572 
4573 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4574 	if (error)
4575 		return error;
4576 
4577 	_receive_convert_reply(lkb, ms, false);
4578 	dlm_put_lkb(lkb);
4579 	return 0;
4580 }
4581 
4582 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4583 				  const struct dlm_message *ms, bool local)
4584 {
4585 	struct dlm_rsb *r = lkb->lkb_resource;
4586 	int error;
4587 
4588 	hold_rsb(r);
4589 	lock_rsb(r);
4590 
4591 	error = validate_message(lkb, ms);
4592 	if (error)
4593 		goto out;
4594 
4595 	error = remove_from_waiters_ms(lkb, ms, local);
4596 	if (error)
4597 		goto out;
4598 
4599 	/* this is the value returned from do_unlock() on the master */
4600 
4601 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4602 	case -DLM_EUNLOCK:
4603 		receive_flags_reply(lkb, ms, local);
4604 		remove_lock_pc(r, lkb);
4605 		queue_cast(r, lkb, -DLM_EUNLOCK);
4606 		break;
4607 	case -ENOENT:
4608 		break;
4609 	default:
4610 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4611 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4612 	}
4613  out:
4614 	unlock_rsb(r);
4615 	put_rsb(r);
4616 }
4617 
4618 static int receive_unlock_reply(struct dlm_ls *ls,
4619 				const struct dlm_message *ms)
4620 {
4621 	struct dlm_lkb *lkb;
4622 	int error;
4623 
4624 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4625 	if (error)
4626 		return error;
4627 
4628 	_receive_unlock_reply(lkb, ms, false);
4629 	dlm_put_lkb(lkb);
4630 	return 0;
4631 }
4632 
4633 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4634 				  const struct dlm_message *ms, bool local)
4635 {
4636 	struct dlm_rsb *r = lkb->lkb_resource;
4637 	int error;
4638 
4639 	hold_rsb(r);
4640 	lock_rsb(r);
4641 
4642 	error = validate_message(lkb, ms);
4643 	if (error)
4644 		goto out;
4645 
4646 	error = remove_from_waiters_ms(lkb, ms, local);
4647 	if (error)
4648 		goto out;
4649 
4650 	/* this is the value returned from do_cancel() on the master */
4651 
4652 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4653 	case -DLM_ECANCEL:
4654 		receive_flags_reply(lkb, ms, local);
4655 		revert_lock_pc(r, lkb);
4656 		queue_cast(r, lkb, -DLM_ECANCEL);
4657 		break;
4658 	case 0:
4659 		break;
4660 	default:
4661 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4662 			  lkb->lkb_id,
4663 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4664 	}
4665  out:
4666 	unlock_rsb(r);
4667 	put_rsb(r);
4668 }
4669 
4670 static int receive_cancel_reply(struct dlm_ls *ls,
4671 				const struct dlm_message *ms)
4672 {
4673 	struct dlm_lkb *lkb;
4674 	int error;
4675 
4676 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4677 	if (error)
4678 		return error;
4679 
4680 	_receive_cancel_reply(lkb, ms, false);
4681 	dlm_put_lkb(lkb);
4682 	return 0;
4683 }
4684 
4685 static void receive_lookup_reply(struct dlm_ls *ls,
4686 				 const struct dlm_message *ms)
4687 {
4688 	struct dlm_lkb *lkb;
4689 	struct dlm_rsb *r;
4690 	int error, ret_nodeid;
4691 	int do_lookup_list = 0;
4692 
4693 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4694 	if (error) {
4695 		log_error(ls, "%s no lkid %x", __func__,
4696 			  le32_to_cpu(ms->m_lkid));
4697 		return;
4698 	}
4699 
4700 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4701 	   FIXME: will a non-zero error ever be returned? */
4702 
4703 	r = lkb->lkb_resource;
4704 	hold_rsb(r);
4705 	lock_rsb(r);
4706 
4707 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4708 	if (error)
4709 		goto out;
4710 
4711 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4712 
4713 	/* We sometimes receive a request from the dir node for this
4714 	   rsb before we've received the dir node's loookup_reply for it.
4715 	   The request from the dir node implies we're the master, so we set
4716 	   ourself as master in receive_request_reply, and verify here that
4717 	   we are indeed the master. */
4718 
4719 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4720 		/* This should never happen */
4721 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4722 			  "master %d dir %d our %d first %x %s",
4723 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4724 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4725 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4726 	}
4727 
4728 	if (ret_nodeid == dlm_our_nodeid()) {
4729 		r->res_master_nodeid = ret_nodeid;
4730 		r->res_nodeid = 0;
4731 		do_lookup_list = 1;
4732 		r->res_first_lkid = 0;
4733 	} else if (ret_nodeid == -1) {
4734 		/* the remote node doesn't believe it's the dir node */
4735 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4736 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4737 		r->res_master_nodeid = 0;
4738 		r->res_nodeid = -1;
4739 		lkb->lkb_nodeid = -1;
4740 	} else {
4741 		/* set_master() will set lkb_nodeid from r */
4742 		r->res_master_nodeid = ret_nodeid;
4743 		r->res_nodeid = ret_nodeid;
4744 	}
4745 
4746 	if (is_overlap(lkb)) {
4747 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4748 			  lkb->lkb_id, dlm_iflags_val(lkb));
4749 		queue_cast_overlap(r, lkb);
4750 		unhold_lkb(lkb); /* undoes create_lkb() */
4751 		goto out_list;
4752 	}
4753 
4754 	_request_lock(r, lkb);
4755 
4756  out_list:
4757 	if (do_lookup_list)
4758 		process_lookup_list(r);
4759  out:
4760 	unlock_rsb(r);
4761 	put_rsb(r);
4762 	dlm_put_lkb(lkb);
4763 }
4764 
4765 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4766 			     uint32_t saved_seq)
4767 {
4768 	int error = 0, noent = 0;
4769 
4770 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4771 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4772 			  le32_to_cpu(ms->m_type),
4773 			  le32_to_cpu(ms->m_header.h_nodeid),
4774 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4775 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4776 		return;
4777 	}
4778 
4779 	switch (ms->m_type) {
4780 
4781 	/* messages sent to a master node */
4782 
4783 	case cpu_to_le32(DLM_MSG_REQUEST):
4784 		error = receive_request(ls, ms);
4785 		break;
4786 
4787 	case cpu_to_le32(DLM_MSG_CONVERT):
4788 		error = receive_convert(ls, ms);
4789 		break;
4790 
4791 	case cpu_to_le32(DLM_MSG_UNLOCK):
4792 		error = receive_unlock(ls, ms);
4793 		break;
4794 
4795 	case cpu_to_le32(DLM_MSG_CANCEL):
4796 		noent = 1;
4797 		error = receive_cancel(ls, ms);
4798 		break;
4799 
4800 	/* messages sent from a master node (replies to above) */
4801 
4802 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4803 		error = receive_request_reply(ls, ms);
4804 		break;
4805 
4806 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4807 		error = receive_convert_reply(ls, ms);
4808 		break;
4809 
4810 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4811 		error = receive_unlock_reply(ls, ms);
4812 		break;
4813 
4814 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4815 		error = receive_cancel_reply(ls, ms);
4816 		break;
4817 
4818 	/* messages sent from a master node (only two types of async msg) */
4819 
4820 	case cpu_to_le32(DLM_MSG_GRANT):
4821 		noent = 1;
4822 		error = receive_grant(ls, ms);
4823 		break;
4824 
4825 	case cpu_to_le32(DLM_MSG_BAST):
4826 		noent = 1;
4827 		error = receive_bast(ls, ms);
4828 		break;
4829 
4830 	/* messages sent to a dir node */
4831 
4832 	case cpu_to_le32(DLM_MSG_LOOKUP):
4833 		receive_lookup(ls, ms);
4834 		break;
4835 
4836 	case cpu_to_le32(DLM_MSG_REMOVE):
4837 		receive_remove(ls, ms);
4838 		break;
4839 
4840 	/* messages sent from a dir node (remove has no reply) */
4841 
4842 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4843 		receive_lookup_reply(ls, ms);
4844 		break;
4845 
4846 	/* other messages */
4847 
4848 	case cpu_to_le32(DLM_MSG_PURGE):
4849 		receive_purge(ls, ms);
4850 		break;
4851 
4852 	default:
4853 		log_error(ls, "unknown message type %d",
4854 			  le32_to_cpu(ms->m_type));
4855 	}
4856 
4857 	/*
4858 	 * When checking for ENOENT, we're checking the result of
4859 	 * find_lkb(m_remid):
4860 	 *
4861 	 * The lock id referenced in the message wasn't found.  This may
4862 	 * happen in normal usage for the async messages and cancel, so
4863 	 * only use log_debug for them.
4864 	 *
4865 	 * Some errors are expected and normal.
4866 	 */
4867 
4868 	if (error == -ENOENT && noent) {
4869 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4870 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4871 			  le32_to_cpu(ms->m_header.h_nodeid),
4872 			  le32_to_cpu(ms->m_lkid), saved_seq);
4873 	} else if (error == -ENOENT) {
4874 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4875 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4876 			  le32_to_cpu(ms->m_header.h_nodeid),
4877 			  le32_to_cpu(ms->m_lkid), saved_seq);
4878 
4879 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4880 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4881 	}
4882 
4883 	if (error == -EINVAL) {
4884 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4885 			  "saved_seq %u",
4886 			  le32_to_cpu(ms->m_type),
4887 			  le32_to_cpu(ms->m_header.h_nodeid),
4888 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4889 			  saved_seq);
4890 	}
4891 }
4892 
4893 /* If the lockspace is in recovery mode (locking stopped), then normal
4894    messages are saved on the requestqueue for processing after recovery is
4895    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4896    messages off the requestqueue before we process new ones. This occurs right
4897    after recovery completes when we transition from saving all messages on
4898    requestqueue, to processing all the saved messages, to processing new
4899    messages as they arrive. */
4900 
4901 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4902 				int nodeid)
4903 {
4904 try_again:
4905 	read_lock_bh(&ls->ls_requestqueue_lock);
4906 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4907 		/* If we were a member of this lockspace, left, and rejoined,
4908 		   other nodes may still be sending us messages from the
4909 		   lockspace generation before we left. */
4910 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4911 			read_unlock_bh(&ls->ls_requestqueue_lock);
4912 			log_limit(ls, "receive %d from %d ignore old gen",
4913 				  le32_to_cpu(ms->m_type), nodeid);
4914 			return;
4915 		}
4916 
4917 		read_unlock_bh(&ls->ls_requestqueue_lock);
4918 		write_lock_bh(&ls->ls_requestqueue_lock);
4919 		/* recheck because we hold writelock now */
4920 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4921 			write_unlock_bh(&ls->ls_requestqueue_lock);
4922 			goto try_again;
4923 		}
4924 
4925 		dlm_add_requestqueue(ls, nodeid, ms);
4926 		write_unlock_bh(&ls->ls_requestqueue_lock);
4927 	} else {
4928 		_receive_message(ls, ms, 0);
4929 		read_unlock_bh(&ls->ls_requestqueue_lock);
4930 	}
4931 }
4932 
4933 /* This is called by dlm_recoverd to process messages that were saved on
4934    the requestqueue. */
4935 
4936 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4937 			       uint32_t saved_seq)
4938 {
4939 	_receive_message(ls, ms, saved_seq);
4940 }
4941 
4942 /* This is called by the midcomms layer when something is received for
4943    the lockspace.  It could be either a MSG (normal message sent as part of
4944    standard locking activity) or an RCOM (recovery message sent as part of
4945    lockspace recovery). */
4946 
4947 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4948 {
4949 	const struct dlm_header *hd = &p->header;
4950 	struct dlm_ls *ls;
4951 	int type = 0;
4952 
4953 	switch (hd->h_cmd) {
4954 	case DLM_MSG:
4955 		type = le32_to_cpu(p->message.m_type);
4956 		break;
4957 	case DLM_RCOM:
4958 		type = le32_to_cpu(p->rcom.rc_type);
4959 		break;
4960 	default:
4961 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4962 		return;
4963 	}
4964 
4965 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4966 		log_print("invalid h_nodeid %d from %d lockspace %x",
4967 			  le32_to_cpu(hd->h_nodeid), nodeid,
4968 			  le32_to_cpu(hd->u.h_lockspace));
4969 		return;
4970 	}
4971 
4972 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4973 	if (!ls) {
4974 		if (dlm_config.ci_log_debug) {
4975 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4976 				"%u from %d cmd %d type %d\n",
4977 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4978 				hd->h_cmd, type);
4979 		}
4980 
4981 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4982 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4983 		return;
4984 	}
4985 
4986 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4987 	   be inactive (in this ls) before transitioning to recovery mode */
4988 
4989 	read_lock_bh(&ls->ls_recv_active);
4990 	if (hd->h_cmd == DLM_MSG)
4991 		dlm_receive_message(ls, &p->message, nodeid);
4992 	else if (hd->h_cmd == DLM_RCOM)
4993 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4994 	else
4995 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4996 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4997 	read_unlock_bh(&ls->ls_recv_active);
4998 
4999 	dlm_put_lockspace(ls);
5000 }
5001 
5002 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5003 				   struct dlm_message *ms_local)
5004 {
5005 	if (middle_conversion(lkb)) {
5006 		hold_lkb(lkb);
5007 		memset(ms_local, 0, sizeof(struct dlm_message));
5008 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5009 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5010 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5011 		_receive_convert_reply(lkb, ms_local, true);
5012 
5013 		/* Same special case as in receive_rcom_lock_args() */
5014 		lkb->lkb_grmode = DLM_LOCK_IV;
5015 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5016 		unhold_lkb(lkb);
5017 
5018 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5019 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5020 	}
5021 
5022 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5023 	   conversions are async; there's no reply from the remote master */
5024 }
5025 
5026 /* A waiting lkb needs recovery if the master node has failed, or
5027    the master node is changing (only when no directory is used) */
5028 
5029 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5030 				 int dir_nodeid)
5031 {
5032 	if (dlm_no_directory(ls))
5033 		return 1;
5034 
5035 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5036 		return 1;
5037 
5038 	return 0;
5039 }
5040 
5041 /* Recovery for locks that are waiting for replies from nodes that are now
5042    gone.  We can just complete unlocks and cancels by faking a reply from the
5043    dead node.  Requests and up-conversions we flag to be resent after
5044    recovery.  Down-conversions can just be completed with a fake reply like
5045    unlocks.  Conversions between PR and CW need special attention. */
5046 
5047 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5048 {
5049 	struct dlm_lkb *lkb, *safe;
5050 	struct dlm_message *ms_local;
5051 	int wait_type, local_unlock_result, local_cancel_result;
5052 	int dir_nodeid;
5053 
5054 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5055 	if (!ms_local)
5056 		return;
5057 
5058 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5059 
5060 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5061 
5062 		/* exclude debug messages about unlocks because there can be so
5063 		   many and they aren't very interesting */
5064 
5065 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5066 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5067 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5068 				  lkb->lkb_id,
5069 				  lkb->lkb_remid,
5070 				  lkb->lkb_wait_type,
5071 				  lkb->lkb_resource->res_nodeid,
5072 				  lkb->lkb_nodeid,
5073 				  lkb->lkb_wait_nodeid,
5074 				  dir_nodeid);
5075 		}
5076 
5077 		/* all outstanding lookups, regardless of destination  will be
5078 		   resent after recovery is done */
5079 
5080 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5081 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5082 			continue;
5083 		}
5084 
5085 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5086 			continue;
5087 
5088 		wait_type = lkb->lkb_wait_type;
5089 		local_unlock_result = -DLM_EUNLOCK;
5090 		local_cancel_result = -DLM_ECANCEL;
5091 
5092 		/* Main reply may have been received leaving a zero wait_type,
5093 		   but a reply for the overlapping op may not have been
5094 		   received.  In that case we need to fake the appropriate
5095 		   reply for the overlap op. */
5096 
5097 		if (!wait_type) {
5098 			if (is_overlap_cancel(lkb)) {
5099 				wait_type = DLM_MSG_CANCEL;
5100 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5101 					local_cancel_result = 0;
5102 			}
5103 			if (is_overlap_unlock(lkb)) {
5104 				wait_type = DLM_MSG_UNLOCK;
5105 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5106 					local_unlock_result = -ENOENT;
5107 			}
5108 
5109 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5110 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5111 				  local_cancel_result, local_unlock_result);
5112 		}
5113 
5114 		switch (wait_type) {
5115 
5116 		case DLM_MSG_REQUEST:
5117 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5118 			break;
5119 
5120 		case DLM_MSG_CONVERT:
5121 			recover_convert_waiter(ls, lkb, ms_local);
5122 			break;
5123 
5124 		case DLM_MSG_UNLOCK:
5125 			hold_lkb(lkb);
5126 			memset(ms_local, 0, sizeof(struct dlm_message));
5127 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5128 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5129 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5130 			_receive_unlock_reply(lkb, ms_local, true);
5131 			dlm_put_lkb(lkb);
5132 			break;
5133 
5134 		case DLM_MSG_CANCEL:
5135 			hold_lkb(lkb);
5136 			memset(ms_local, 0, sizeof(struct dlm_message));
5137 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5138 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5139 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5140 			_receive_cancel_reply(lkb, ms_local, true);
5141 			dlm_put_lkb(lkb);
5142 			break;
5143 
5144 		default:
5145 			log_error(ls, "invalid lkb wait_type %d %d",
5146 				  lkb->lkb_wait_type, wait_type);
5147 		}
5148 		schedule();
5149 	}
5150 	kfree(ms_local);
5151 }
5152 
5153 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5154 {
5155 	struct dlm_lkb *lkb = NULL, *iter;
5156 
5157 	spin_lock_bh(&ls->ls_waiters_lock);
5158 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5159 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5160 			hold_lkb(iter);
5161 			lkb = iter;
5162 			break;
5163 		}
5164 	}
5165 	spin_unlock_bh(&ls->ls_waiters_lock);
5166 
5167 	return lkb;
5168 }
5169 
5170 /*
5171  * Forced state reset for locks that were in the middle of remote operations
5172  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5173  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5174  * list need to be reevaluated; some may need resending to a different node
5175  * than previously, and some may now need local handling rather than remote.
5176  *
5177  * First, the lkb state for the voided remote operation is forcibly reset,
5178  * equivalent to what remove_from_waiters() would normally do:
5179  * . lkb removed from ls_waiters list
5180  * . lkb wait_type cleared
5181  * . lkb waiters_count cleared
5182  * . lkb ref count decremented for each waiters_count (almost always 1,
5183  *   but possibly 2 in case of cancel/unlock overlapping, which means
5184  *   two remote replies were being expected for the lkb.)
5185  *
5186  * Second, the lkb is reprocessed like an original operation would be,
5187  * by passing it to _request_lock or _convert_lock, which will either
5188  * process the lkb operation locally, or send it to a remote node again
5189  * and put the lkb back onto the waiters list.
5190  *
5191  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5192  * force-unlock or cancel, either from before recovery began, or after recovery
5193  * finished.  If this is the case, the unlock/cancel is done directly, and the
5194  * original operation is not initiated again (no _request_lock/_convert_lock.)
5195  */
5196 
5197 int dlm_recover_waiters_post(struct dlm_ls *ls)
5198 {
5199 	struct dlm_lkb *lkb;
5200 	struct dlm_rsb *r;
5201 	int error = 0, mstype, err, oc, ou;
5202 
5203 	while (1) {
5204 		if (dlm_locking_stopped(ls)) {
5205 			log_debug(ls, "recover_waiters_post aborted");
5206 			error = -EINTR;
5207 			break;
5208 		}
5209 
5210 		/*
5211 		 * Find an lkb from the waiters list that's been affected by
5212 		 * recovery node changes, and needs to be reprocessed.  Does
5213 		 * hold_lkb(), adding a refcount.
5214 		 */
5215 		lkb = find_resend_waiter(ls);
5216 		if (!lkb)
5217 			break;
5218 
5219 		r = lkb->lkb_resource;
5220 		hold_rsb(r);
5221 		lock_rsb(r);
5222 
5223 		/*
5224 		 * If the lkb has been flagged for a force unlock or cancel,
5225 		 * then the reprocessing below will be replaced by just doing
5226 		 * the unlock/cancel directly.
5227 		 */
5228 		mstype = lkb->lkb_wait_type;
5229 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5230 					&lkb->lkb_iflags);
5231 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5232 					&lkb->lkb_iflags);
5233 		err = 0;
5234 
5235 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5236 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5237 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5238 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5239 			  dlm_dir_nodeid(r), oc, ou);
5240 
5241 		/*
5242 		 * No reply to the pre-recovery operation will now be received,
5243 		 * so a forced equivalent of remove_from_waiters() is needed to
5244 		 * reset the waiters state that was in place before recovery.
5245 		 */
5246 
5247 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5248 
5249 		/* Forcibly clear wait_type */
5250 		lkb->lkb_wait_type = 0;
5251 
5252 		/*
5253 		 * Forcibly reset wait_count and associated refcount.  The
5254 		 * wait_count will almost always be 1, but in case of an
5255 		 * overlapping unlock/cancel it could be 2: see where
5256 		 * add_to_waiters() finds the lkb is already on the waiters
5257 		 * list and does lkb_wait_count++; hold_lkb().
5258 		 */
5259 		while (lkb->lkb_wait_count) {
5260 			lkb->lkb_wait_count--;
5261 			unhold_lkb(lkb);
5262 		}
5263 
5264 		/* Forcibly remove from waiters list */
5265 		spin_lock_bh(&ls->ls_waiters_lock);
5266 		list_del_init(&lkb->lkb_wait_reply);
5267 		spin_unlock_bh(&ls->ls_waiters_lock);
5268 
5269 		/*
5270 		 * The lkb is now clear of all prior waiters state and can be
5271 		 * processed locally, or sent to remote node again, or directly
5272 		 * cancelled/unlocked.
5273 		 */
5274 
5275 		if (oc || ou) {
5276 			/* do an unlock or cancel instead of resending */
5277 			switch (mstype) {
5278 			case DLM_MSG_LOOKUP:
5279 			case DLM_MSG_REQUEST:
5280 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5281 							-DLM_ECANCEL);
5282 				unhold_lkb(lkb); /* undoes create_lkb() */
5283 				break;
5284 			case DLM_MSG_CONVERT:
5285 				if (oc) {
5286 					queue_cast(r, lkb, -DLM_ECANCEL);
5287 				} else {
5288 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5289 					_unlock_lock(r, lkb);
5290 				}
5291 				break;
5292 			default:
5293 				err = 1;
5294 			}
5295 		} else {
5296 			switch (mstype) {
5297 			case DLM_MSG_LOOKUP:
5298 			case DLM_MSG_REQUEST:
5299 				_request_lock(r, lkb);
5300 				if (is_master(r))
5301 					confirm_master(r, 0);
5302 				break;
5303 			case DLM_MSG_CONVERT:
5304 				_convert_lock(r, lkb);
5305 				break;
5306 			default:
5307 				err = 1;
5308 			}
5309 		}
5310 
5311 		if (err) {
5312 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5313 				  "dir_nodeid %d overlap %d %d",
5314 				  lkb->lkb_id, mstype, r->res_nodeid,
5315 				  dlm_dir_nodeid(r), oc, ou);
5316 		}
5317 		unlock_rsb(r);
5318 		put_rsb(r);
5319 		dlm_put_lkb(lkb);
5320 	}
5321 
5322 	return error;
5323 }
5324 
5325 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5326 			      struct list_head *list)
5327 {
5328 	struct dlm_lkb *lkb, *safe;
5329 
5330 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5331 		if (!is_master_copy(lkb))
5332 			continue;
5333 
5334 		/* don't purge lkbs we've added in recover_master_copy for
5335 		   the current recovery seq */
5336 
5337 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5338 			continue;
5339 
5340 		del_lkb(r, lkb);
5341 
5342 		/* this put should free the lkb */
5343 		if (!dlm_put_lkb(lkb))
5344 			log_error(ls, "purged mstcpy lkb not released");
5345 	}
5346 }
5347 
5348 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5349 {
5350 	struct dlm_ls *ls = r->res_ls;
5351 
5352 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5353 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5354 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5355 }
5356 
5357 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5358 			    struct list_head *list,
5359 			    int nodeid_gone, unsigned int *count)
5360 {
5361 	struct dlm_lkb *lkb, *safe;
5362 
5363 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5364 		if (!is_master_copy(lkb))
5365 			continue;
5366 
5367 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5368 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5369 
5370 			/* tell recover_lvb to invalidate the lvb
5371 			   because a node holding EX/PW failed */
5372 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5373 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5374 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5375 			}
5376 
5377 			del_lkb(r, lkb);
5378 
5379 			/* this put should free the lkb */
5380 			if (!dlm_put_lkb(lkb))
5381 				log_error(ls, "purged dead lkb not released");
5382 
5383 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5384 
5385 			(*count)++;
5386 		}
5387 	}
5388 }
5389 
5390 /* Get rid of locks held by nodes that are gone. */
5391 
5392 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5393 {
5394 	struct dlm_rsb *r;
5395 	struct dlm_member *memb;
5396 	int nodes_count = 0;
5397 	int nodeid_gone = 0;
5398 	unsigned int lkb_count = 0;
5399 
5400 	/* cache one removed nodeid to optimize the common
5401 	   case of a single node removed */
5402 
5403 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5404 		nodes_count++;
5405 		nodeid_gone = memb->nodeid;
5406 	}
5407 
5408 	if (!nodes_count)
5409 		return;
5410 
5411 	list_for_each_entry(r, root_list, res_root_list) {
5412 		hold_rsb(r);
5413 		lock_rsb(r);
5414 		if (is_master(r)) {
5415 			purge_dead_list(ls, r, &r->res_grantqueue,
5416 					nodeid_gone, &lkb_count);
5417 			purge_dead_list(ls, r, &r->res_convertqueue,
5418 					nodeid_gone, &lkb_count);
5419 			purge_dead_list(ls, r, &r->res_waitqueue,
5420 					nodeid_gone, &lkb_count);
5421 		}
5422 		unlock_rsb(r);
5423 		unhold_rsb(r);
5424 		cond_resched();
5425 	}
5426 
5427 	if (lkb_count)
5428 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5429 			  lkb_count, nodes_count);
5430 }
5431 
5432 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5433 {
5434 	struct dlm_rsb *r;
5435 
5436 	read_lock_bh(&ls->ls_rsbtbl_lock);
5437 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5438 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5439 			continue;
5440 		if (!is_master(r)) {
5441 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5442 			continue;
5443 		}
5444 		hold_rsb(r);
5445 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5446 		return r;
5447 	}
5448 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5449 	return NULL;
5450 }
5451 
5452 /*
5453  * Attempt to grant locks on resources that we are the master of.
5454  * Locks may have become grantable during recovery because locks
5455  * from departed nodes have been purged (or not rebuilt), allowing
5456  * previously blocked locks to now be granted.  The subset of rsb's
5457  * we are interested in are those with lkb's on either the convert or
5458  * waiting queues.
5459  *
5460  * Simplest would be to go through each master rsb and check for non-empty
5461  * convert or waiting queues, and attempt to grant on those rsbs.
5462  * Checking the queues requires lock_rsb, though, for which we'd need
5463  * to release the rsbtbl lock.  This would make iterating through all
5464  * rsb's very inefficient.  So, we rely on earlier recovery routines
5465  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5466  * locks for.
5467  */
5468 
5469 void dlm_recover_grant(struct dlm_ls *ls)
5470 {
5471 	struct dlm_rsb *r;
5472 	unsigned int count = 0;
5473 	unsigned int rsb_count = 0;
5474 	unsigned int lkb_count = 0;
5475 
5476 	while (1) {
5477 		r = find_grant_rsb(ls);
5478 		if (!r)
5479 			break;
5480 
5481 		rsb_count++;
5482 		count = 0;
5483 		lock_rsb(r);
5484 		/* the RECOVER_GRANT flag is checked in the grant path */
5485 		grant_pending_locks(r, &count);
5486 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5487 		lkb_count += count;
5488 		confirm_master(r, 0);
5489 		unlock_rsb(r);
5490 		put_rsb(r);
5491 		cond_resched();
5492 	}
5493 
5494 	if (lkb_count)
5495 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5496 			  lkb_count, rsb_count);
5497 }
5498 
5499 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5500 					 uint32_t remid)
5501 {
5502 	struct dlm_lkb *lkb;
5503 
5504 	list_for_each_entry(lkb, head, lkb_statequeue) {
5505 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5506 			return lkb;
5507 	}
5508 	return NULL;
5509 }
5510 
5511 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5512 				    uint32_t remid)
5513 {
5514 	struct dlm_lkb *lkb;
5515 
5516 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5517 	if (lkb)
5518 		return lkb;
5519 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5520 	if (lkb)
5521 		return lkb;
5522 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5523 	if (lkb)
5524 		return lkb;
5525 	return NULL;
5526 }
5527 
5528 /* needs at least dlm_rcom + rcom_lock */
5529 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5530 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5531 {
5532 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5533 
5534 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5535 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5536 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5537 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5538 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5539 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5540 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5541 	lkb->lkb_rqmode = rl->rl_rqmode;
5542 	lkb->lkb_grmode = rl->rl_grmode;
5543 	/* don't set lkb_status because add_lkb wants to itself */
5544 
5545 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5546 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5547 
5548 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5549 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5550 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5551 		if (lvblen > ls->ls_lvblen)
5552 			return -EINVAL;
5553 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5554 		if (!lkb->lkb_lvbptr)
5555 			return -ENOMEM;
5556 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5557 	}
5558 
5559 	/* Conversions between PR and CW (middle modes) need special handling.
5560 	   The real granted mode of these converting locks cannot be determined
5561 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5562 
5563 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5564 	    middle_conversion(lkb)) {
5565 		rl->rl_status = DLM_LKSTS_CONVERT;
5566 		lkb->lkb_grmode = DLM_LOCK_IV;
5567 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5568 	}
5569 
5570 	return 0;
5571 }
5572 
5573 /* This lkb may have been recovered in a previous aborted recovery so we need
5574    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5575    If so we just send back a standard reply.  If not, we create a new lkb with
5576    the given values and send back our lkid.  We send back our lkid by sending
5577    back the rcom_lock struct we got but with the remid field filled in. */
5578 
5579 /* needs at least dlm_rcom + rcom_lock */
5580 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5581 			    __le32 *rl_remid, __le32 *rl_result)
5582 {
5583 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5584 	struct dlm_rsb *r;
5585 	struct dlm_lkb *lkb;
5586 	uint32_t remid = 0;
5587 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5588 	int error;
5589 
5590 	/* init rl_remid with rcom lock rl_remid */
5591 	*rl_remid = rl->rl_remid;
5592 
5593 	if (rl->rl_parent_lkid) {
5594 		error = -EOPNOTSUPP;
5595 		goto out;
5596 	}
5597 
5598 	remid = le32_to_cpu(rl->rl_lkid);
5599 
5600 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5601 	   have to require it.  Recovery of masters on one node can overlap
5602 	   recovery of locks on another node, so one node can send us MSTCPY
5603 	   locks before we've made ourselves master of this rsb.  We can still
5604 	   add new MSTCPY locks that we receive here without any harm; when
5605 	   we make ourselves master, dlm_recover_masters() won't touch the
5606 	   MSTCPY locks we've received early. */
5607 
5608 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5609 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5610 	if (error)
5611 		goto out;
5612 
5613 	lock_rsb(r);
5614 
5615 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5616 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5617 			  from_nodeid, remid);
5618 		error = -EBADR;
5619 		goto out_unlock;
5620 	}
5621 
5622 	lkb = search_remid(r, from_nodeid, remid);
5623 	if (lkb) {
5624 		error = -EEXIST;
5625 		goto out_remid;
5626 	}
5627 
5628 	error = create_lkb(ls, &lkb);
5629 	if (error)
5630 		goto out_unlock;
5631 
5632 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5633 	if (error) {
5634 		__put_lkb(ls, lkb);
5635 		goto out_unlock;
5636 	}
5637 
5638 	attach_lkb(r, lkb);
5639 	add_lkb(r, lkb, rl->rl_status);
5640 	ls->ls_recover_locks_in++;
5641 
5642 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5643 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5644 
5645  out_remid:
5646 	/* this is the new value returned to the lock holder for
5647 	   saving in its process-copy lkb */
5648 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5649 
5650 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5651 
5652  out_unlock:
5653 	unlock_rsb(r);
5654 	put_rsb(r);
5655  out:
5656 	if (error && error != -EEXIST)
5657 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5658 			  from_nodeid, remid, error);
5659 	*rl_result = cpu_to_le32(error);
5660 	return error;
5661 }
5662 
5663 /* needs at least dlm_rcom + rcom_lock */
5664 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5665 			     uint64_t seq)
5666 {
5667 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5668 	struct dlm_rsb *r;
5669 	struct dlm_lkb *lkb;
5670 	uint32_t lkid, remid;
5671 	int error, result;
5672 
5673 	lkid = le32_to_cpu(rl->rl_lkid);
5674 	remid = le32_to_cpu(rl->rl_remid);
5675 	result = le32_to_cpu(rl->rl_result);
5676 
5677 	error = find_lkb(ls, lkid, &lkb);
5678 	if (error) {
5679 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5680 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5681 			  result);
5682 		return error;
5683 	}
5684 
5685 	r = lkb->lkb_resource;
5686 	hold_rsb(r);
5687 	lock_rsb(r);
5688 
5689 	if (!is_process_copy(lkb)) {
5690 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5691 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5692 			  result);
5693 		dlm_dump_rsb(r);
5694 		unlock_rsb(r);
5695 		put_rsb(r);
5696 		dlm_put_lkb(lkb);
5697 		return -EINVAL;
5698 	}
5699 
5700 	switch (result) {
5701 	case -EBADR:
5702 		/* There's a chance the new master received our lock before
5703 		   dlm_recover_master_reply(), this wouldn't happen if we did
5704 		   a barrier between recover_masters and recover_locks. */
5705 
5706 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5707 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5708 			  result);
5709 
5710 		dlm_send_rcom_lock(r, lkb, seq);
5711 		goto out;
5712 	case -EEXIST:
5713 	case 0:
5714 		lkb->lkb_remid = remid;
5715 		break;
5716 	default:
5717 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5718 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5719 			  result);
5720 	}
5721 
5722 	/* an ack for dlm_recover_locks() which waits for replies from
5723 	   all the locks it sends to new masters */
5724 	dlm_recovered_lock(r);
5725  out:
5726 	unlock_rsb(r);
5727 	put_rsb(r);
5728 	dlm_put_lkb(lkb);
5729 
5730 	return 0;
5731 }
5732 
5733 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5734 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5735 {
5736 	struct dlm_lkb *lkb;
5737 	struct dlm_args args;
5738 	bool do_put = true;
5739 	int error;
5740 
5741 	dlm_lock_recovery(ls);
5742 
5743 	error = create_lkb(ls, &lkb);
5744 	if (error) {
5745 		kfree(ua);
5746 		goto out;
5747 	}
5748 
5749 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5750 
5751 	if (flags & DLM_LKF_VALBLK) {
5752 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5753 		if (!ua->lksb.sb_lvbptr) {
5754 			kfree(ua);
5755 			error = -ENOMEM;
5756 			goto out_put;
5757 		}
5758 	}
5759 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5760 			      fake_bastfn, &args);
5761 	if (error) {
5762 		kfree(ua->lksb.sb_lvbptr);
5763 		ua->lksb.sb_lvbptr = NULL;
5764 		kfree(ua);
5765 		goto out_put;
5766 	}
5767 
5768 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5769 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5770 	   lock and that lkb_astparam is the dlm_user_args structure. */
5771 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5772 	error = request_lock(ls, lkb, name, namelen, &args);
5773 
5774 	switch (error) {
5775 	case 0:
5776 		break;
5777 	case -EINPROGRESS:
5778 		error = 0;
5779 		break;
5780 	case -EAGAIN:
5781 		error = 0;
5782 		fallthrough;
5783 	default:
5784 		goto out_put;
5785 	}
5786 
5787 	/* add this new lkb to the per-process list of locks */
5788 	spin_lock_bh(&ua->proc->locks_spin);
5789 	hold_lkb(lkb);
5790 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5791 	spin_unlock_bh(&ua->proc->locks_spin);
5792 	do_put = false;
5793  out_put:
5794 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5795 	if (do_put)
5796 		__put_lkb(ls, lkb);
5797  out:
5798 	dlm_unlock_recovery(ls);
5799 	return error;
5800 }
5801 
5802 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5803 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5804 {
5805 	struct dlm_lkb *lkb;
5806 	struct dlm_args args;
5807 	struct dlm_user_args *ua;
5808 	int error;
5809 
5810 	dlm_lock_recovery(ls);
5811 
5812 	error = find_lkb(ls, lkid, &lkb);
5813 	if (error)
5814 		goto out;
5815 
5816 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5817 
5818 	/* user can change the params on its lock when it converts it, or
5819 	   add an lvb that didn't exist before */
5820 
5821 	ua = lkb->lkb_ua;
5822 
5823 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5824 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5825 		if (!ua->lksb.sb_lvbptr) {
5826 			error = -ENOMEM;
5827 			goto out_put;
5828 		}
5829 	}
5830 	if (lvb_in && ua->lksb.sb_lvbptr)
5831 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5832 
5833 	ua->xid = ua_tmp->xid;
5834 	ua->castparam = ua_tmp->castparam;
5835 	ua->castaddr = ua_tmp->castaddr;
5836 	ua->bastparam = ua_tmp->bastparam;
5837 	ua->bastaddr = ua_tmp->bastaddr;
5838 	ua->user_lksb = ua_tmp->user_lksb;
5839 
5840 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5841 			      fake_bastfn, &args);
5842 	if (error)
5843 		goto out_put;
5844 
5845 	error = convert_lock(ls, lkb, &args);
5846 
5847 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5848 		error = 0;
5849  out_put:
5850 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5851 	dlm_put_lkb(lkb);
5852  out:
5853 	dlm_unlock_recovery(ls);
5854 	kfree(ua_tmp);
5855 	return error;
5856 }
5857 
5858 /*
5859  * The caller asks for an orphan lock on a given resource with a given mode.
5860  * If a matching lock exists, it's moved to the owner's list of locks and
5861  * the lkid is returned.
5862  */
5863 
5864 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5865 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5866 		     uint32_t *lkid)
5867 {
5868 	struct dlm_lkb *lkb = NULL, *iter;
5869 	struct dlm_user_args *ua;
5870 	int found_other_mode = 0;
5871 	int rv = 0;
5872 
5873 	spin_lock_bh(&ls->ls_orphans_lock);
5874 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5875 		if (iter->lkb_resource->res_length != namelen)
5876 			continue;
5877 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5878 			continue;
5879 		if (iter->lkb_grmode != mode) {
5880 			found_other_mode = 1;
5881 			continue;
5882 		}
5883 
5884 		lkb = iter;
5885 		list_del_init(&iter->lkb_ownqueue);
5886 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5887 		*lkid = iter->lkb_id;
5888 		break;
5889 	}
5890 	spin_unlock_bh(&ls->ls_orphans_lock);
5891 
5892 	if (!lkb && found_other_mode) {
5893 		rv = -EAGAIN;
5894 		goto out;
5895 	}
5896 
5897 	if (!lkb) {
5898 		rv = -ENOENT;
5899 		goto out;
5900 	}
5901 
5902 	lkb->lkb_exflags = flags;
5903 	lkb->lkb_ownpid = (int) current->pid;
5904 
5905 	ua = lkb->lkb_ua;
5906 
5907 	ua->proc = ua_tmp->proc;
5908 	ua->xid = ua_tmp->xid;
5909 	ua->castparam = ua_tmp->castparam;
5910 	ua->castaddr = ua_tmp->castaddr;
5911 	ua->bastparam = ua_tmp->bastparam;
5912 	ua->bastaddr = ua_tmp->bastaddr;
5913 	ua->user_lksb = ua_tmp->user_lksb;
5914 
5915 	/*
5916 	 * The lkb reference from the ls_orphans list was not
5917 	 * removed above, and is now considered the reference
5918 	 * for the proc locks list.
5919 	 */
5920 
5921 	spin_lock_bh(&ua->proc->locks_spin);
5922 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5923 	spin_unlock_bh(&ua->proc->locks_spin);
5924  out:
5925 	kfree(ua_tmp);
5926 	return rv;
5927 }
5928 
5929 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5930 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5931 {
5932 	struct dlm_lkb *lkb;
5933 	struct dlm_args args;
5934 	struct dlm_user_args *ua;
5935 	int error;
5936 
5937 	dlm_lock_recovery(ls);
5938 
5939 	error = find_lkb(ls, lkid, &lkb);
5940 	if (error)
5941 		goto out;
5942 
5943 	trace_dlm_unlock_start(ls, lkb, flags);
5944 
5945 	ua = lkb->lkb_ua;
5946 
5947 	if (lvb_in && ua->lksb.sb_lvbptr)
5948 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5949 	if (ua_tmp->castparam)
5950 		ua->castparam = ua_tmp->castparam;
5951 	ua->user_lksb = ua_tmp->user_lksb;
5952 
5953 	error = set_unlock_args(flags, ua, &args);
5954 	if (error)
5955 		goto out_put;
5956 
5957 	error = unlock_lock(ls, lkb, &args);
5958 
5959 	if (error == -DLM_EUNLOCK)
5960 		error = 0;
5961 	/* from validate_unlock_args() */
5962 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5963 		error = 0;
5964 	if (error)
5965 		goto out_put;
5966 
5967 	spin_lock_bh(&ua->proc->locks_spin);
5968 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5969 	if (!list_empty(&lkb->lkb_ownqueue))
5970 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5971 	spin_unlock_bh(&ua->proc->locks_spin);
5972  out_put:
5973 	trace_dlm_unlock_end(ls, lkb, flags, error);
5974 	dlm_put_lkb(lkb);
5975  out:
5976 	dlm_unlock_recovery(ls);
5977 	kfree(ua_tmp);
5978 	return error;
5979 }
5980 
5981 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5982 		    uint32_t flags, uint32_t lkid)
5983 {
5984 	struct dlm_lkb *lkb;
5985 	struct dlm_args args;
5986 	struct dlm_user_args *ua;
5987 	int error;
5988 
5989 	dlm_lock_recovery(ls);
5990 
5991 	error = find_lkb(ls, lkid, &lkb);
5992 	if (error)
5993 		goto out;
5994 
5995 	trace_dlm_unlock_start(ls, lkb, flags);
5996 
5997 	ua = lkb->lkb_ua;
5998 	if (ua_tmp->castparam)
5999 		ua->castparam = ua_tmp->castparam;
6000 	ua->user_lksb = ua_tmp->user_lksb;
6001 
6002 	error = set_unlock_args(flags, ua, &args);
6003 	if (error)
6004 		goto out_put;
6005 
6006 	error = cancel_lock(ls, lkb, &args);
6007 
6008 	if (error == -DLM_ECANCEL)
6009 		error = 0;
6010 	/* from validate_unlock_args() */
6011 	if (error == -EBUSY)
6012 		error = 0;
6013  out_put:
6014 	trace_dlm_unlock_end(ls, lkb, flags, error);
6015 	dlm_put_lkb(lkb);
6016  out:
6017 	dlm_unlock_recovery(ls);
6018 	kfree(ua_tmp);
6019 	return error;
6020 }
6021 
6022 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6023 {
6024 	struct dlm_lkb *lkb;
6025 	struct dlm_args args;
6026 	struct dlm_user_args *ua;
6027 	struct dlm_rsb *r;
6028 	int error;
6029 
6030 	dlm_lock_recovery(ls);
6031 
6032 	error = find_lkb(ls, lkid, &lkb);
6033 	if (error)
6034 		goto out;
6035 
6036 	trace_dlm_unlock_start(ls, lkb, flags);
6037 
6038 	ua = lkb->lkb_ua;
6039 
6040 	error = set_unlock_args(flags, ua, &args);
6041 	if (error)
6042 		goto out_put;
6043 
6044 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6045 
6046 	r = lkb->lkb_resource;
6047 	hold_rsb(r);
6048 	lock_rsb(r);
6049 
6050 	error = validate_unlock_args(lkb, &args);
6051 	if (error)
6052 		goto out_r;
6053 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6054 
6055 	error = _cancel_lock(r, lkb);
6056  out_r:
6057 	unlock_rsb(r);
6058 	put_rsb(r);
6059 
6060 	if (error == -DLM_ECANCEL)
6061 		error = 0;
6062 	/* from validate_unlock_args() */
6063 	if (error == -EBUSY)
6064 		error = 0;
6065  out_put:
6066 	trace_dlm_unlock_end(ls, lkb, flags, error);
6067 	dlm_put_lkb(lkb);
6068  out:
6069 	dlm_unlock_recovery(ls);
6070 	return error;
6071 }
6072 
6073 /* lkb's that are removed from the waiters list by revert are just left on the
6074    orphans list with the granted orphan locks, to be freed by purge */
6075 
6076 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6077 {
6078 	struct dlm_args args;
6079 	int error;
6080 
6081 	hold_lkb(lkb); /* reference for the ls_orphans list */
6082 	spin_lock_bh(&ls->ls_orphans_lock);
6083 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6084 	spin_unlock_bh(&ls->ls_orphans_lock);
6085 
6086 	set_unlock_args(0, lkb->lkb_ua, &args);
6087 
6088 	error = cancel_lock(ls, lkb, &args);
6089 	if (error == -DLM_ECANCEL)
6090 		error = 0;
6091 	return error;
6092 }
6093 
6094 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6095    granted.  Regardless of what rsb queue the lock is on, it's removed and
6096    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6097    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6098 
6099 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6100 {
6101 	struct dlm_args args;
6102 	int error;
6103 
6104 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6105 			lkb->lkb_ua, &args);
6106 
6107 	error = unlock_lock(ls, lkb, &args);
6108 	if (error == -DLM_EUNLOCK)
6109 		error = 0;
6110 	return error;
6111 }
6112 
6113 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6114    (which does lock_rsb) due to deadlock with receiving a message that does
6115    lock_rsb followed by dlm_user_add_cb() */
6116 
6117 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6118 				     struct dlm_user_proc *proc)
6119 {
6120 	struct dlm_lkb *lkb = NULL;
6121 
6122 	spin_lock_bh(&ls->ls_clear_proc_locks);
6123 	if (list_empty(&proc->locks))
6124 		goto out;
6125 
6126 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6127 	list_del_init(&lkb->lkb_ownqueue);
6128 
6129 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6130 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6131 	else
6132 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6133  out:
6134 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6135 	return lkb;
6136 }
6137 
6138 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6139    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6140    which we clear here. */
6141 
6142 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6143    list, and no more device_writes should add lkb's to proc->locks list; so we
6144    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6145    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6146    them ourself. */
6147 
6148 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6149 {
6150 	struct dlm_callback *cb, *cb_safe;
6151 	struct dlm_lkb *lkb, *safe;
6152 
6153 	dlm_lock_recovery(ls);
6154 
6155 	while (1) {
6156 		lkb = del_proc_lock(ls, proc);
6157 		if (!lkb)
6158 			break;
6159 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6160 			orphan_proc_lock(ls, lkb);
6161 		else
6162 			unlock_proc_lock(ls, lkb);
6163 
6164 		/* this removes the reference for the proc->locks list
6165 		   added by dlm_user_request, it may result in the lkb
6166 		   being freed */
6167 
6168 		dlm_put_lkb(lkb);
6169 	}
6170 
6171 	spin_lock_bh(&ls->ls_clear_proc_locks);
6172 
6173 	/* in-progress unlocks */
6174 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6175 		list_del_init(&lkb->lkb_ownqueue);
6176 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6177 		dlm_put_lkb(lkb);
6178 	}
6179 
6180 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6181 		list_del(&cb->list);
6182 		dlm_free_cb(cb);
6183 	}
6184 
6185 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6186 	dlm_unlock_recovery(ls);
6187 }
6188 
6189 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6190 {
6191 	struct dlm_callback *cb, *cb_safe;
6192 	struct dlm_lkb *lkb, *safe;
6193 
6194 	while (1) {
6195 		lkb = NULL;
6196 		spin_lock_bh(&proc->locks_spin);
6197 		if (!list_empty(&proc->locks)) {
6198 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6199 					 lkb_ownqueue);
6200 			list_del_init(&lkb->lkb_ownqueue);
6201 		}
6202 		spin_unlock_bh(&proc->locks_spin);
6203 
6204 		if (!lkb)
6205 			break;
6206 
6207 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6208 		unlock_proc_lock(ls, lkb);
6209 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6210 	}
6211 
6212 	spin_lock_bh(&proc->locks_spin);
6213 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6214 		list_del_init(&lkb->lkb_ownqueue);
6215 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6216 		dlm_put_lkb(lkb);
6217 	}
6218 	spin_unlock_bh(&proc->locks_spin);
6219 
6220 	spin_lock_bh(&proc->asts_spin);
6221 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6222 		list_del(&cb->list);
6223 		dlm_free_cb(cb);
6224 	}
6225 	spin_unlock_bh(&proc->asts_spin);
6226 }
6227 
6228 /* pid of 0 means purge all orphans */
6229 
6230 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6231 {
6232 	struct dlm_lkb *lkb, *safe;
6233 
6234 	spin_lock_bh(&ls->ls_orphans_lock);
6235 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6236 		if (pid && lkb->lkb_ownpid != pid)
6237 			continue;
6238 		unlock_proc_lock(ls, lkb);
6239 		list_del_init(&lkb->lkb_ownqueue);
6240 		dlm_put_lkb(lkb);
6241 	}
6242 	spin_unlock_bh(&ls->ls_orphans_lock);
6243 }
6244 
6245 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6246 {
6247 	struct dlm_message *ms;
6248 	struct dlm_mhandle *mh;
6249 	int error;
6250 
6251 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6252 				DLM_MSG_PURGE, &ms, &mh);
6253 	if (error)
6254 		return error;
6255 	ms->m_nodeid = cpu_to_le32(nodeid);
6256 	ms->m_pid = cpu_to_le32(pid);
6257 
6258 	return send_message(mh, ms, NULL, 0);
6259 }
6260 
6261 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6262 		   int nodeid, int pid)
6263 {
6264 	int error = 0;
6265 
6266 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6267 		error = send_purge(ls, nodeid, pid);
6268 	} else {
6269 		dlm_lock_recovery(ls);
6270 		if (pid == current->pid)
6271 			purge_proc_locks(ls, proc);
6272 		else
6273 			do_purge(ls, nodeid, pid);
6274 		dlm_unlock_recovery(ls);
6275 	}
6276 	return error;
6277 }
6278 
6279 /* debug functionality */
6280 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6281 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6282 {
6283 	struct dlm_lksb *lksb;
6284 	struct dlm_lkb *lkb;
6285 	struct dlm_rsb *r;
6286 	int error;
6287 
6288 	/* we currently can't set a valid user lock */
6289 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6290 		return -EOPNOTSUPP;
6291 
6292 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6293 	if (!lksb)
6294 		return -ENOMEM;
6295 
6296 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6297 	if (error) {
6298 		kfree(lksb);
6299 		return error;
6300 	}
6301 
6302 	dlm_set_dflags_val(lkb, lkb_dflags);
6303 	lkb->lkb_nodeid = lkb_nodeid;
6304 	lkb->lkb_lksb = lksb;
6305 	/* user specific pointer, just don't have it NULL for kernel locks */
6306 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6307 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6308 
6309 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6310 	if (error) {
6311 		kfree(lksb);
6312 		__put_lkb(ls, lkb);
6313 		return error;
6314 	}
6315 
6316 	lock_rsb(r);
6317 	attach_lkb(r, lkb);
6318 	add_lkb(r, lkb, lkb_status);
6319 	unlock_rsb(r);
6320 	put_rsb(r);
6321 
6322 	return 0;
6323 }
6324 
6325 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6326 				 int mstype, int to_nodeid)
6327 {
6328 	struct dlm_lkb *lkb;
6329 	int error;
6330 
6331 	error = find_lkb(ls, lkb_id, &lkb);
6332 	if (error)
6333 		return error;
6334 
6335 	error = add_to_waiters(lkb, mstype, to_nodeid);
6336 	dlm_put_lkb(lkb);
6337 	return error;
6338 }
6339 
6340