xref: /linux/fs/dlm/lock.c (revision c532de5a67a70f8533d495f8f2aaa9a0491c3ad0)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 	if (error)
738 		goto do_new;
739 
740 	/* check if the rsb is active under read lock - likely path */
741 	read_lock_bh(&ls->ls_rsbtbl_lock);
742 	if (!rsb_flag(r, RSB_HASHED)) {
743 		read_unlock_bh(&ls->ls_rsbtbl_lock);
744 		goto do_new;
745 	}
746 
747 	/*
748 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
749 	 */
750 
751 	if (rsb_flag(r, RSB_INACTIVE)) {
752 		read_unlock_bh(&ls->ls_rsbtbl_lock);
753 		goto do_inactive;
754 	}
755 
756 	kref_get(&r->res_ref);
757 	read_unlock_bh(&ls->ls_rsbtbl_lock);
758 	goto out;
759 
760 
761  do_inactive:
762 	write_lock_bh(&ls->ls_rsbtbl_lock);
763 
764 	/*
765 	 * The expectation here is that the rsb will have HASHED and
766 	 * INACTIVE flags set, and that the rsb can be moved from
767 	 * inactive back to active again.  However, between releasing
768 	 * the read lock and acquiring the write lock, this rsb could
769 	 * have been removed from rsbtbl, and had HASHED cleared, to
770 	 * be freed.  To deal with this case, we would normally need
771 	 * to repeat dlm_search_rsb_tree while holding the write lock,
772 	 * but rcu allows us to simply check the HASHED flag, because
773 	 * the rcu read lock means the rsb will not be freed yet.
774 	 * If the HASHED flag is not set, then the rsb is being freed,
775 	 * so we add a new rsb struct.  If the HASHED flag is set,
776 	 * and INACTIVE is not set, it means another thread has
777 	 * made the rsb active, as we're expecting to do here, and
778 	 * we just repeat the lookup (this will be very unlikely.)
779 	 */
780 	if (rsb_flag(r, RSB_HASHED)) {
781 		if (!rsb_flag(r, RSB_INACTIVE)) {
782 			write_unlock_bh(&ls->ls_rsbtbl_lock);
783 			goto retry;
784 		}
785 	} else {
786 		write_unlock_bh(&ls->ls_rsbtbl_lock);
787 		goto do_new;
788 	}
789 
790 	/*
791 	 * rsb found inactive (master_nodeid may be out of date unless
792 	 * we are the dir_nodeid or were the master)  No other thread
793 	 * is using this rsb because it's inactive, so we can
794 	 * look at or update res_master_nodeid without lock_rsb.
795 	 */
796 
797 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
798 		/* our rsb was not master, and another node (not the dir node)
799 		   has sent us a request */
800 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
801 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
802 			  r->res_name);
803 		write_unlock_bh(&ls->ls_rsbtbl_lock);
804 		error = -ENOTBLK;
805 		goto out;
806 	}
807 
808 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
809 		/* don't think this should ever happen */
810 		log_error(ls, "find_rsb inactive from_dir %d master %d",
811 			  from_nodeid, r->res_master_nodeid);
812 		dlm_print_rsb(r);
813 		/* fix it and go on */
814 		r->res_master_nodeid = our_nodeid;
815 		r->res_nodeid = 0;
816 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
817 		r->res_first_lkid = 0;
818 	}
819 
820 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
821 		/* Because we have held no locks on this rsb,
822 		   res_master_nodeid could have become stale. */
823 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
824 		r->res_first_lkid = 0;
825 	}
826 
827 	/* A dir record will not be on the scan list. */
828 	if (r->res_dir_nodeid != our_nodeid)
829 		del_scan(ls, r);
830 	list_move(&r->res_slow_list, &ls->ls_slow_active);
831 	rsb_clear_flag(r, RSB_INACTIVE);
832 	kref_init(&r->res_ref); /* ref is now used in active state */
833 	write_unlock_bh(&ls->ls_rsbtbl_lock);
834 
835 	goto out;
836 
837 
838  do_new:
839 	/*
840 	 * rsb not found
841 	 */
842 
843 	if (error == -EBADR && !create)
844 		goto out;
845 
846 	error = get_rsb_struct(ls, name, len, &r);
847 	if (WARN_ON_ONCE(error))
848 		goto out;
849 
850 	r->res_hash = hash;
851 	r->res_dir_nodeid = dir_nodeid;
852 	kref_init(&r->res_ref);
853 
854 	if (from_dir) {
855 		/* want to see how often this happens */
856 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
857 			  from_nodeid, r->res_name);
858 		r->res_master_nodeid = our_nodeid;
859 		r->res_nodeid = 0;
860 		goto out_add;
861 	}
862 
863 	if (from_other && (dir_nodeid != our_nodeid)) {
864 		/* should never happen */
865 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
866 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
867 		dlm_free_rsb(r);
868 		r = NULL;
869 		error = -ENOTBLK;
870 		goto out;
871 	}
872 
873 	if (from_other) {
874 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
875 			  from_nodeid, dir_nodeid, r->res_name);
876 	}
877 
878 	if (dir_nodeid == our_nodeid) {
879 		/* When we are the dir nodeid, we can set the master
880 		   node immediately */
881 		r->res_master_nodeid = our_nodeid;
882 		r->res_nodeid = 0;
883 	} else {
884 		/* set_master will send_lookup to dir_nodeid */
885 		r->res_master_nodeid = 0;
886 		r->res_nodeid = -1;
887 	}
888 
889  out_add:
890 
891 	write_lock_bh(&ls->ls_rsbtbl_lock);
892 	error = rsb_insert(r, &ls->ls_rsbtbl);
893 	if (error == -EEXIST) {
894 		/* somebody else was faster and it seems the
895 		 * rsb exists now, we do a whole relookup
896 		 */
897 		write_unlock_bh(&ls->ls_rsbtbl_lock);
898 		dlm_free_rsb(r);
899 		goto retry;
900 	} else if (!error) {
901 		list_add(&r->res_slow_list, &ls->ls_slow_active);
902 	}
903 	write_unlock_bh(&ls->ls_rsbtbl_lock);
904  out:
905 	*r_ret = r;
906 	return error;
907 }
908 
909 /* During recovery, other nodes can send us new MSTCPY locks (from
910    dlm_recover_locks) before we've made ourself master (in
911    dlm_recover_masters). */
912 
913 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
914 			  uint32_t hash, int dir_nodeid, int from_nodeid,
915 			  unsigned int flags, struct dlm_rsb **r_ret)
916 {
917 	struct dlm_rsb *r = NULL;
918 	int our_nodeid = dlm_our_nodeid();
919 	int recover = (flags & R_RECEIVE_RECOVER);
920 	int error;
921 
922  retry:
923 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
924 	if (error)
925 		goto do_new;
926 
927 	/* check if the rsb is in active state under read lock - likely path */
928 	read_lock_bh(&ls->ls_rsbtbl_lock);
929 	if (!rsb_flag(r, RSB_HASHED)) {
930 		read_unlock_bh(&ls->ls_rsbtbl_lock);
931 		goto do_new;
932 	}
933 
934 	if (rsb_flag(r, RSB_INACTIVE)) {
935 		read_unlock_bh(&ls->ls_rsbtbl_lock);
936 		goto do_inactive;
937 	}
938 
939 	/*
940 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
941 	 */
942 
943 	kref_get(&r->res_ref);
944 	read_unlock_bh(&ls->ls_rsbtbl_lock);
945 
946 	goto out;
947 
948 
949  do_inactive:
950 	write_lock_bh(&ls->ls_rsbtbl_lock);
951 
952 	/* See comment in find_rsb_dir. */
953 	if (rsb_flag(r, RSB_HASHED)) {
954 		if (!rsb_flag(r, RSB_INACTIVE)) {
955 			write_unlock_bh(&ls->ls_rsbtbl_lock);
956 			goto retry;
957 		}
958 	} else {
959 		write_unlock_bh(&ls->ls_rsbtbl_lock);
960 		goto do_new;
961 	}
962 
963 
964 	/*
965 	 * rsb found inactive. No other thread is using this rsb because
966 	 * it's inactive, so we can look at or update res_master_nodeid
967 	 * without lock_rsb.
968 	 */
969 
970 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
971 		/* our rsb is not master, and another node has sent us a
972 		   request; this should never happen */
973 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
974 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
975 		dlm_print_rsb(r);
976 		write_unlock_bh(&ls->ls_rsbtbl_lock);
977 		error = -ENOTBLK;
978 		goto out;
979 	}
980 
981 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
982 	    (dir_nodeid == our_nodeid)) {
983 		/* our rsb is not master, and we are dir; may as well fix it;
984 		   this should never happen */
985 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
986 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
987 		dlm_print_rsb(r);
988 		r->res_master_nodeid = our_nodeid;
989 		r->res_nodeid = 0;
990 	}
991 
992 	list_move(&r->res_slow_list, &ls->ls_slow_active);
993 	rsb_clear_flag(r, RSB_INACTIVE);
994 	kref_init(&r->res_ref);
995 	del_scan(ls, r);
996 	write_unlock_bh(&ls->ls_rsbtbl_lock);
997 
998 	goto out;
999 
1000 
1001  do_new:
1002 	/*
1003 	 * rsb not found
1004 	 */
1005 
1006 	error = get_rsb_struct(ls, name, len, &r);
1007 	if (WARN_ON_ONCE(error))
1008 		goto out;
1009 
1010 	r->res_hash = hash;
1011 	r->res_dir_nodeid = dir_nodeid;
1012 	r->res_master_nodeid = dir_nodeid;
1013 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1014 	kref_init(&r->res_ref);
1015 
1016 	write_lock_bh(&ls->ls_rsbtbl_lock);
1017 	error = rsb_insert(r, &ls->ls_rsbtbl);
1018 	if (error == -EEXIST) {
1019 		/* somebody else was faster and it seems the
1020 		 * rsb exists now, we do a whole relookup
1021 		 */
1022 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1023 		dlm_free_rsb(r);
1024 		goto retry;
1025 	} else if (!error) {
1026 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1027 	}
1028 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1029 
1030  out:
1031 	*r_ret = r;
1032 	return error;
1033 }
1034 
1035 /*
1036  * rsb rcu usage
1037  *
1038  * While rcu read lock is held, the rsb cannot be freed,
1039  * which allows a lookup optimization.
1040  *
1041  * Two threads are accessing the same rsb concurrently,
1042  * the first (A) is trying to use the rsb, the second (B)
1043  * is trying to free the rsb.
1044  *
1045  * thread A                 thread B
1046  * (trying to use rsb)      (trying to free rsb)
1047  *
1048  * A1. rcu read lock
1049  * A2. rsbtbl read lock
1050  * A3. look up rsb in rsbtbl
1051  * A4. rsbtbl read unlock
1052  *                          B1. rsbtbl write lock
1053  *                          B2. look up rsb in rsbtbl
1054  *                          B3. remove rsb from rsbtbl
1055  *                          B4. clear rsb HASHED flag
1056  *                          B5. rsbtbl write unlock
1057  *                          B6. begin freeing rsb using rcu...
1058  *
1059  * (rsb is inactive, so try to make it active again)
1060  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1061  * A6. the rsb HASHED flag is not set, which it means the rsb
1062  *     is being removed from rsbtbl and freed, so don't use it.
1063  * A7. rcu read unlock
1064  *
1065  *                          B7. ...finish freeing rsb using rcu
1066  * A8. create a new rsb
1067  *
1068  * Without the rcu optimization, steps A5-8 would need to do
1069  * an extra rsbtbl lookup:
1070  * A5. rsbtbl write lock
1071  * A6. look up rsb in rsbtbl, not found
1072  * A7. rsbtbl write unlock
1073  * A8. create a new rsb
1074  */
1075 
1076 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1077 		    int from_nodeid, unsigned int flags,
1078 		    struct dlm_rsb **r_ret)
1079 {
1080 	int dir_nodeid;
1081 	uint32_t hash;
1082 	int rv;
1083 
1084 	if (len > DLM_RESNAME_MAXLEN)
1085 		return -EINVAL;
1086 
1087 	hash = jhash(name, len, 0);
1088 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1089 
1090 	rcu_read_lock();
1091 	if (dlm_no_directory(ls))
1092 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1093 				      from_nodeid, flags, r_ret);
1094 	else
1095 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1096 				    from_nodeid, flags, r_ret);
1097 	rcu_read_unlock();
1098 	return rv;
1099 }
1100 
1101 /* we have received a request and found that res_master_nodeid != our_nodeid,
1102    so we need to return an error or make ourself the master */
1103 
1104 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1105 				  int from_nodeid)
1106 {
1107 	if (dlm_no_directory(ls)) {
1108 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1109 			  from_nodeid, r->res_master_nodeid,
1110 			  r->res_dir_nodeid);
1111 		dlm_print_rsb(r);
1112 		return -ENOTBLK;
1113 	}
1114 
1115 	if (from_nodeid != r->res_dir_nodeid) {
1116 		/* our rsb is not master, and another node (not the dir node)
1117 	   	   has sent us a request.  this is much more common when our
1118 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1119 
1120 		if (r->res_master_nodeid) {
1121 			log_debug(ls, "validate master from_other %d master %d "
1122 				  "dir %d first %x %s", from_nodeid,
1123 				  r->res_master_nodeid, r->res_dir_nodeid,
1124 				  r->res_first_lkid, r->res_name);
1125 		}
1126 		return -ENOTBLK;
1127 	} else {
1128 		/* our rsb is not master, but the dir nodeid has sent us a
1129 	   	   request; this could happen with master 0 / res_nodeid -1 */
1130 
1131 		if (r->res_master_nodeid) {
1132 			log_error(ls, "validate master from_dir %d master %d "
1133 				  "first %x %s",
1134 				  from_nodeid, r->res_master_nodeid,
1135 				  r->res_first_lkid, r->res_name);
1136 		}
1137 
1138 		r->res_master_nodeid = dlm_our_nodeid();
1139 		r->res_nodeid = 0;
1140 		return 0;
1141 	}
1142 }
1143 
1144 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1145 				int from_nodeid, bool is_inactive, unsigned int flags,
1146 				int *r_nodeid, int *result)
1147 {
1148 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1149 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1150 
1151 	if (r->res_dir_nodeid != our_nodeid) {
1152 		/* should not happen, but may as well fix it and carry on */
1153 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1154 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1155 		r->res_dir_nodeid = our_nodeid;
1156 	}
1157 
1158 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1159 		/* Recovery uses this function to set a new master when
1160 		 * the previous master failed.  Setting NEW_MASTER will
1161 		 * force dlm_recover_masters to call recover_master on this
1162 		 * rsb even though the res_nodeid is no longer removed.
1163 		 */
1164 
1165 		r->res_master_nodeid = from_nodeid;
1166 		r->res_nodeid = from_nodeid;
1167 		rsb_set_flag(r, RSB_NEW_MASTER);
1168 
1169 		if (is_inactive) {
1170 			/* I don't think we should ever find it inactive. */
1171 			log_error(ls, "%s fix_master inactive", __func__);
1172 			dlm_dump_rsb(r);
1173 		}
1174 	}
1175 
1176 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1177 		/* this will happen if from_nodeid became master during
1178 		 * a previous recovery cycle, and we aborted the previous
1179 		 * cycle before recovering this master value
1180 		 */
1181 
1182 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1183 			  __func__, from_nodeid, r->res_master_nodeid,
1184 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1185 
1186 		if (r->res_master_nodeid == our_nodeid) {
1187 			log_error(ls, "from_master %d our_master", from_nodeid);
1188 			dlm_dump_rsb(r);
1189 			goto ret_assign;
1190 		}
1191 
1192 		r->res_master_nodeid = from_nodeid;
1193 		r->res_nodeid = from_nodeid;
1194 		rsb_set_flag(r, RSB_NEW_MASTER);
1195 	}
1196 
1197 	if (!r->res_master_nodeid) {
1198 		/* this will happen if recovery happens while we're looking
1199 		 * up the master for this rsb
1200 		 */
1201 
1202 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1203 			  from_nodeid, r->res_first_lkid, r->res_name);
1204 		r->res_master_nodeid = from_nodeid;
1205 		r->res_nodeid = from_nodeid;
1206 	}
1207 
1208 	if (!from_master && !fix_master &&
1209 	    (r->res_master_nodeid == from_nodeid)) {
1210 		/* this can happen when the master sends remove, the dir node
1211 		 * finds the rsb on the active list and ignores the remove,
1212 		 * and the former master sends a lookup
1213 		 */
1214 
1215 		log_limit(ls, "%s from master %d flags %x first %x %s",
1216 			  __func__, from_nodeid, flags, r->res_first_lkid,
1217 			  r->res_name);
1218 	}
1219 
1220  ret_assign:
1221 	*r_nodeid = r->res_master_nodeid;
1222 	if (result)
1223 		*result = DLM_LU_MATCH;
1224 }
1225 
1226 /*
1227  * We're the dir node for this res and another node wants to know the
1228  * master nodeid.  During normal operation (non recovery) this is only
1229  * called from receive_lookup(); master lookups when the local node is
1230  * the dir node are done by find_rsb().
1231  *
1232  * normal operation, we are the dir node for a resource
1233  * . _request_lock
1234  * . set_master
1235  * . send_lookup
1236  * . receive_lookup
1237  * . dlm_master_lookup flags 0
1238  *
1239  * recover directory, we are rebuilding dir for all resources
1240  * . dlm_recover_directory
1241  * . dlm_rcom_names
1242  *   remote node sends back the rsb names it is master of and we are dir of
1243  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1244  *   we either create new rsb setting remote node as master, or find existing
1245  *   rsb and set master to be the remote node.
1246  *
1247  * recover masters, we are finding the new master for resources
1248  * . dlm_recover_masters
1249  * . recover_master
1250  * . dlm_send_rcom_lookup
1251  * . receive_rcom_lookup
1252  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1253  */
1254 
1255 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1256 			      int len, unsigned int flags, int *r_nodeid, int *result)
1257 {
1258 	struct dlm_rsb *r = NULL;
1259 	uint32_t hash;
1260 	int our_nodeid = dlm_our_nodeid();
1261 	int dir_nodeid, error;
1262 
1263 	if (len > DLM_RESNAME_MAXLEN)
1264 		return -EINVAL;
1265 
1266 	if (from_nodeid == our_nodeid) {
1267 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1268 			  our_nodeid, flags);
1269 		return -EINVAL;
1270 	}
1271 
1272 	hash = jhash(name, len, 0);
1273 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1274 	if (dir_nodeid != our_nodeid) {
1275 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1276 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1277 			  ls->ls_num_nodes);
1278 		*r_nodeid = -1;
1279 		return -EINVAL;
1280 	}
1281 
1282  retry:
1283 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1284 	if (error)
1285 		goto not_found;
1286 
1287 	/* check if the rsb is active under read lock - likely path */
1288 	read_lock_bh(&ls->ls_rsbtbl_lock);
1289 	if (!rsb_flag(r, RSB_HASHED)) {
1290 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1291 		goto not_found;
1292 	}
1293 
1294 	if (rsb_flag(r, RSB_INACTIVE)) {
1295 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 		goto do_inactive;
1297 	}
1298 
1299 	/* because the rsb is active, we need to lock_rsb before
1300 	 * checking/changing re_master_nodeid
1301 	 */
1302 
1303 	hold_rsb(r);
1304 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1305 	lock_rsb(r);
1306 
1307 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1308 			    flags, r_nodeid, result);
1309 
1310 	/* the rsb was active */
1311 	unlock_rsb(r);
1312 	put_rsb(r);
1313 
1314 	return 0;
1315 
1316  do_inactive:
1317 	/* unlikely path - check if still part of ls_rsbtbl */
1318 	write_lock_bh(&ls->ls_rsbtbl_lock);
1319 
1320 	/* see comment in find_rsb_dir */
1321 	if (rsb_flag(r, RSB_HASHED)) {
1322 		if (!rsb_flag(r, RSB_INACTIVE)) {
1323 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1324 			/* something as changed, very unlikely but
1325 			 * try again
1326 			 */
1327 			goto retry;
1328 		}
1329 	} else {
1330 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1331 		goto not_found;
1332 	}
1333 
1334 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1335 	   is not used, but is protected by the rsbtbl lock */
1336 
1337 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1338 			    r_nodeid, result);
1339 
1340 	/* A dir record rsb should never be on scan list. */
1341 	/* Try to fix this with del_scan? */
1342 	WARN_ON(!list_empty(&r->res_scan_list));
1343 
1344 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1345 
1346 	return 0;
1347 
1348  not_found:
1349 	error = get_rsb_struct(ls, name, len, &r);
1350 	if (WARN_ON_ONCE(error))
1351 		goto out;
1352 
1353 	r->res_hash = hash;
1354 	r->res_dir_nodeid = our_nodeid;
1355 	r->res_master_nodeid = from_nodeid;
1356 	r->res_nodeid = from_nodeid;
1357 	rsb_set_flag(r, RSB_INACTIVE);
1358 
1359 	write_lock_bh(&ls->ls_rsbtbl_lock);
1360 	error = rsb_insert(r, &ls->ls_rsbtbl);
1361 	if (error == -EEXIST) {
1362 		/* somebody else was faster and it seems the
1363 		 * rsb exists now, we do a whole relookup
1364 		 */
1365 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1366 		dlm_free_rsb(r);
1367 		goto retry;
1368 	} else if (error) {
1369 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1370 		/* should never happen */
1371 		dlm_free_rsb(r);
1372 		goto retry;
1373 	}
1374 
1375 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1376 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1377 
1378 	if (result)
1379 		*result = DLM_LU_ADD;
1380 	*r_nodeid = from_nodeid;
1381  out:
1382 	return error;
1383 }
1384 
1385 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1386 		      int len, unsigned int flags, int *r_nodeid, int *result)
1387 {
1388 	int rv;
1389 	rcu_read_lock();
1390 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1391 	rcu_read_unlock();
1392 	return rv;
1393 }
1394 
1395 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1396 {
1397 	struct dlm_rsb *r;
1398 
1399 	read_lock_bh(&ls->ls_rsbtbl_lock);
1400 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1401 		if (r->res_hash == hash)
1402 			dlm_dump_rsb(r);
1403 	}
1404 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1405 }
1406 
1407 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1408 {
1409 	struct dlm_rsb *r = NULL;
1410 	int error;
1411 
1412 	rcu_read_lock();
1413 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1414 	if (!error)
1415 		goto out;
1416 
1417 	dlm_dump_rsb(r);
1418  out:
1419 	rcu_read_unlock();
1420 }
1421 
1422 static void deactivate_rsb(struct kref *kref)
1423 {
1424 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1425 	struct dlm_ls *ls = r->res_ls;
1426 	int our_nodeid = dlm_our_nodeid();
1427 
1428 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1429 	rsb_set_flag(r, RSB_INACTIVE);
1430 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1431 
1432 	/*
1433 	 * When the rsb becomes unused:
1434 	 * - If it's not a dir record for a remote master rsb,
1435 	 *   then it is put on the scan list to be freed.
1436 	 * - If it's a dir record for a remote master rsb,
1437 	 *   then it is kept in the inactive state until
1438 	 *   receive_remove() from the master node.
1439 	 */
1440 	if (!dlm_no_directory(ls) &&
1441 	    (r->res_master_nodeid != our_nodeid) &&
1442 	    (dlm_dir_nodeid(r) != our_nodeid))
1443 		add_scan(ls, r);
1444 
1445 	if (r->res_lvbptr) {
1446 		dlm_free_lvb(r->res_lvbptr);
1447 		r->res_lvbptr = NULL;
1448 	}
1449 }
1450 
1451 void free_inactive_rsb(struct dlm_rsb *r)
1452 {
1453 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1454 
1455 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1456 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1457 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1458 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1459 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1460 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1461 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1462 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1463 
1464 	dlm_free_rsb(r);
1465 }
1466 
1467 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1468    The rsb must exist as long as any lkb's for it do. */
1469 
1470 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472 	hold_rsb(r);
1473 	lkb->lkb_resource = r;
1474 }
1475 
1476 static void detach_lkb(struct dlm_lkb *lkb)
1477 {
1478 	if (lkb->lkb_resource) {
1479 		put_rsb(lkb->lkb_resource);
1480 		lkb->lkb_resource = NULL;
1481 	}
1482 }
1483 
1484 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1485 		       unsigned long start, unsigned long end)
1486 {
1487 	struct xa_limit limit;
1488 	struct dlm_lkb *lkb;
1489 	int rv;
1490 
1491 	limit.max = end;
1492 	limit.min = start;
1493 
1494 	lkb = dlm_allocate_lkb();
1495 	if (!lkb)
1496 		return -ENOMEM;
1497 
1498 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1499 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1500 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1501 	lkb->lkb_nodeid = -1;
1502 	lkb->lkb_grmode = DLM_LOCK_IV;
1503 	kref_init(&lkb->lkb_ref);
1504 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1505 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1506 
1507 	write_lock_bh(&ls->ls_lkbxa_lock);
1508 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1509 	write_unlock_bh(&ls->ls_lkbxa_lock);
1510 
1511 	if (rv < 0) {
1512 		log_error(ls, "create_lkb xa error %d", rv);
1513 		dlm_free_lkb(lkb);
1514 		return rv;
1515 	}
1516 
1517 	*lkb_ret = lkb;
1518 	return 0;
1519 }
1520 
1521 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1522 {
1523 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1524 }
1525 
1526 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1527 {
1528 	struct dlm_lkb *lkb;
1529 
1530 	rcu_read_lock();
1531 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1532 	if (lkb) {
1533 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1534 		 * the lkb_ref is tight to the lkbxa data structure, see
1535 		 * __put_lkb().
1536 		 */
1537 		read_lock_bh(&ls->ls_lkbxa_lock);
1538 		if (kref_read(&lkb->lkb_ref))
1539 			kref_get(&lkb->lkb_ref);
1540 		else
1541 			lkb = NULL;
1542 		read_unlock_bh(&ls->ls_lkbxa_lock);
1543 	}
1544 	rcu_read_unlock();
1545 
1546 	*lkb_ret = lkb;
1547 	return lkb ? 0 : -ENOENT;
1548 }
1549 
1550 static void kill_lkb(struct kref *kref)
1551 {
1552 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1553 
1554 	/* All work is done after the return from kref_put() so we
1555 	   can release the write_lock before the detach_lkb */
1556 
1557 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1558 }
1559 
1560 /* __put_lkb() is used when an lkb may not have an rsb attached to
1561    it so we need to provide the lockspace explicitly */
1562 
1563 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1564 {
1565 	uint32_t lkid = lkb->lkb_id;
1566 	int rv;
1567 
1568 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1569 					&ls->ls_lkbxa_lock);
1570 	if (rv) {
1571 		xa_erase(&ls->ls_lkbxa, lkid);
1572 		write_unlock_bh(&ls->ls_lkbxa_lock);
1573 
1574 		detach_lkb(lkb);
1575 
1576 		/* for local/process lkbs, lvbptr points to caller's lksb */
1577 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1578 			dlm_free_lvb(lkb->lkb_lvbptr);
1579 		dlm_free_lkb(lkb);
1580 	}
1581 
1582 	return rv;
1583 }
1584 
1585 int dlm_put_lkb(struct dlm_lkb *lkb)
1586 {
1587 	struct dlm_ls *ls;
1588 
1589 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1590 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1591 
1592 	ls = lkb->lkb_resource->res_ls;
1593 	return __put_lkb(ls, lkb);
1594 }
1595 
1596 /* This is only called to add a reference when the code already holds
1597    a valid reference to the lkb, so there's no need for locking. */
1598 
1599 static inline void hold_lkb(struct dlm_lkb *lkb)
1600 {
1601 	kref_get(&lkb->lkb_ref);
1602 }
1603 
1604 static void unhold_lkb_assert(struct kref *kref)
1605 {
1606 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1607 
1608 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1609 }
1610 
1611 /* This is called when we need to remove a reference and are certain
1612    it's not the last ref.  e.g. del_lkb is always called between a
1613    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1614    put_lkb would work fine, but would involve unnecessary locking */
1615 
1616 static inline void unhold_lkb(struct dlm_lkb *lkb)
1617 {
1618 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1619 }
1620 
1621 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1622 			    int mode)
1623 {
1624 	struct dlm_lkb *lkb = NULL, *iter;
1625 
1626 	list_for_each_entry(iter, head, lkb_statequeue)
1627 		if (iter->lkb_rqmode < mode) {
1628 			lkb = iter;
1629 			list_add_tail(new, &iter->lkb_statequeue);
1630 			break;
1631 		}
1632 
1633 	if (!lkb)
1634 		list_add_tail(new, head);
1635 }
1636 
1637 /* add/remove lkb to rsb's grant/convert/wait queue */
1638 
1639 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1640 {
1641 	kref_get(&lkb->lkb_ref);
1642 
1643 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1644 
1645 	lkb->lkb_timestamp = ktime_get();
1646 
1647 	lkb->lkb_status = status;
1648 
1649 	switch (status) {
1650 	case DLM_LKSTS_WAITING:
1651 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1652 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1653 		else
1654 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1655 		break;
1656 	case DLM_LKSTS_GRANTED:
1657 		/* convention says granted locks kept in order of grmode */
1658 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1659 				lkb->lkb_grmode);
1660 		break;
1661 	case DLM_LKSTS_CONVERT:
1662 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1663 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1664 		else
1665 			list_add_tail(&lkb->lkb_statequeue,
1666 				      &r->res_convertqueue);
1667 		break;
1668 	default:
1669 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1670 	}
1671 }
1672 
1673 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1674 {
1675 	lkb->lkb_status = 0;
1676 	list_del(&lkb->lkb_statequeue);
1677 	unhold_lkb(lkb);
1678 }
1679 
1680 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1681 {
1682 	del_lkb(r, lkb);
1683 	add_lkb(r, lkb, sts);
1684 }
1685 
1686 static int msg_reply_type(int mstype)
1687 {
1688 	switch (mstype) {
1689 	case DLM_MSG_REQUEST:
1690 		return DLM_MSG_REQUEST_REPLY;
1691 	case DLM_MSG_CONVERT:
1692 		return DLM_MSG_CONVERT_REPLY;
1693 	case DLM_MSG_UNLOCK:
1694 		return DLM_MSG_UNLOCK_REPLY;
1695 	case DLM_MSG_CANCEL:
1696 		return DLM_MSG_CANCEL_REPLY;
1697 	case DLM_MSG_LOOKUP:
1698 		return DLM_MSG_LOOKUP_REPLY;
1699 	}
1700 	return -1;
1701 }
1702 
1703 /* add/remove lkb from global waiters list of lkb's waiting for
1704    a reply from a remote node */
1705 
1706 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1707 {
1708 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1709 	int error = 0;
1710 
1711 	spin_lock_bh(&ls->ls_waiters_lock);
1712 
1713 	if (is_overlap_unlock(lkb) ||
1714 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1715 		error = -EINVAL;
1716 		goto out;
1717 	}
1718 
1719 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1720 		switch (mstype) {
1721 		case DLM_MSG_UNLOCK:
1722 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1723 			break;
1724 		case DLM_MSG_CANCEL:
1725 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1726 			break;
1727 		default:
1728 			error = -EBUSY;
1729 			goto out;
1730 		}
1731 		lkb->lkb_wait_count++;
1732 		hold_lkb(lkb);
1733 
1734 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1735 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1736 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1737 		goto out;
1738 	}
1739 
1740 	DLM_ASSERT(!lkb->lkb_wait_count,
1741 		   dlm_print_lkb(lkb);
1742 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1743 
1744 	lkb->lkb_wait_count++;
1745 	lkb->lkb_wait_type = mstype;
1746 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1747 	hold_lkb(lkb);
1748 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1749  out:
1750 	if (error)
1751 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1752 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1753 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1754 	spin_unlock_bh(&ls->ls_waiters_lock);
1755 	return error;
1756 }
1757 
1758 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1759    list as part of process_requestqueue (e.g. a lookup that has an optimized
1760    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1761    set RESEND and dlm_recover_waiters_post() */
1762 
1763 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1764 				const struct dlm_message *ms)
1765 {
1766 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1767 	int overlap_done = 0;
1768 
1769 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1770 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1771 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1772 		overlap_done = 1;
1773 		goto out_del;
1774 	}
1775 
1776 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1777 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1778 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1779 		overlap_done = 1;
1780 		goto out_del;
1781 	}
1782 
1783 	/* Cancel state was preemptively cleared by a successful convert,
1784 	   see next comment, nothing to do. */
1785 
1786 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1787 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1788 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1789 			  lkb->lkb_id, lkb->lkb_wait_type);
1790 		return -1;
1791 	}
1792 
1793 	/* Remove for the convert reply, and premptively remove for the
1794 	   cancel reply.  A convert has been granted while there's still
1795 	   an outstanding cancel on it (the cancel is moot and the result
1796 	   in the cancel reply should be 0).  We preempt the cancel reply
1797 	   because the app gets the convert result and then can follow up
1798 	   with another op, like convert.  This subsequent op would see the
1799 	   lingering state of the cancel and fail with -EBUSY. */
1800 
1801 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1802 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1803 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1804 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1805 			  lkb->lkb_id);
1806 		lkb->lkb_wait_type = 0;
1807 		lkb->lkb_wait_count--;
1808 		unhold_lkb(lkb);
1809 		goto out_del;
1810 	}
1811 
1812 	/* N.B. type of reply may not always correspond to type of original
1813 	   msg due to lookup->request optimization, verify others? */
1814 
1815 	if (lkb->lkb_wait_type) {
1816 		lkb->lkb_wait_type = 0;
1817 		goto out_del;
1818 	}
1819 
1820 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1821 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1822 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1823 	return -1;
1824 
1825  out_del:
1826 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1827 	   to the op that was in progress prior to the unlock/cancel; we
1828 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1829 	   this would happen */
1830 
1831 	if (overlap_done && lkb->lkb_wait_type) {
1832 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1833 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1834 		lkb->lkb_wait_count--;
1835 		unhold_lkb(lkb);
1836 		lkb->lkb_wait_type = 0;
1837 	}
1838 
1839 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1840 
1841 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1842 	lkb->lkb_wait_count--;
1843 	if (!lkb->lkb_wait_count)
1844 		list_del_init(&lkb->lkb_wait_reply);
1845 	unhold_lkb(lkb);
1846 	return 0;
1847 }
1848 
1849 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1850 {
1851 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1852 	int error;
1853 
1854 	spin_lock_bh(&ls->ls_waiters_lock);
1855 	error = _remove_from_waiters(lkb, mstype, NULL);
1856 	spin_unlock_bh(&ls->ls_waiters_lock);
1857 	return error;
1858 }
1859 
1860 /* Handles situations where we might be processing a "fake" or "local" reply in
1861  * the recovery context which stops any locking activity. Only debugfs might
1862  * change the lockspace waiters but they will held the recovery lock to ensure
1863  * remove_from_waiters_ms() in local case will be the only user manipulating the
1864  * lockspace waiters in recovery context.
1865  */
1866 
1867 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1868 				  const struct dlm_message *ms, bool local)
1869 {
1870 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1871 	int error;
1872 
1873 	if (!local)
1874 		spin_lock_bh(&ls->ls_waiters_lock);
1875 	else
1876 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1877 			     !dlm_locking_stopped(ls));
1878 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1879 	if (!local)
1880 		spin_unlock_bh(&ls->ls_waiters_lock);
1881 	return error;
1882 }
1883 
1884 /* lkb is master or local copy */
1885 
1886 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1887 {
1888 	int b, len = r->res_ls->ls_lvblen;
1889 
1890 	/* b=1 lvb returned to caller
1891 	   b=0 lvb written to rsb or invalidated
1892 	   b=-1 do nothing */
1893 
1894 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1895 
1896 	if (b == 1) {
1897 		if (!lkb->lkb_lvbptr)
1898 			return;
1899 
1900 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1901 			return;
1902 
1903 		if (!r->res_lvbptr)
1904 			return;
1905 
1906 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1907 		lkb->lkb_lvbseq = r->res_lvbseq;
1908 
1909 	} else if (b == 0) {
1910 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1911 			rsb_set_flag(r, RSB_VALNOTVALID);
1912 			return;
1913 		}
1914 
1915 		if (!lkb->lkb_lvbptr)
1916 			return;
1917 
1918 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1919 			return;
1920 
1921 		if (!r->res_lvbptr)
1922 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1923 
1924 		if (!r->res_lvbptr)
1925 			return;
1926 
1927 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1928 		r->res_lvbseq++;
1929 		lkb->lkb_lvbseq = r->res_lvbseq;
1930 		rsb_clear_flag(r, RSB_VALNOTVALID);
1931 	}
1932 
1933 	if (rsb_flag(r, RSB_VALNOTVALID))
1934 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1935 }
1936 
1937 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1938 {
1939 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1940 		return;
1941 
1942 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1943 		rsb_set_flag(r, RSB_VALNOTVALID);
1944 		return;
1945 	}
1946 
1947 	if (!lkb->lkb_lvbptr)
1948 		return;
1949 
1950 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1951 		return;
1952 
1953 	if (!r->res_lvbptr)
1954 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1955 
1956 	if (!r->res_lvbptr)
1957 		return;
1958 
1959 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1960 	r->res_lvbseq++;
1961 	rsb_clear_flag(r, RSB_VALNOTVALID);
1962 }
1963 
1964 /* lkb is process copy (pc) */
1965 
1966 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1967 			    const struct dlm_message *ms)
1968 {
1969 	int b;
1970 
1971 	if (!lkb->lkb_lvbptr)
1972 		return;
1973 
1974 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1975 		return;
1976 
1977 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1978 	if (b == 1) {
1979 		int len = receive_extralen(ms);
1980 		if (len > r->res_ls->ls_lvblen)
1981 			len = r->res_ls->ls_lvblen;
1982 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1983 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1984 	}
1985 }
1986 
1987 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1988    remove_lock -- used for unlock, removes lkb from granted
1989    revert_lock -- used for cancel, moves lkb from convert to granted
1990    grant_lock  -- used for request and convert, adds lkb to granted or
1991                   moves lkb from convert or waiting to granted
1992 
1993    Each of these is used for master or local copy lkb's.  There is
1994    also a _pc() variation used to make the corresponding change on
1995    a process copy (pc) lkb. */
1996 
1997 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999 	del_lkb(r, lkb);
2000 	lkb->lkb_grmode = DLM_LOCK_IV;
2001 	/* this unhold undoes the original ref from create_lkb()
2002 	   so this leads to the lkb being freed */
2003 	unhold_lkb(lkb);
2004 }
2005 
2006 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2007 {
2008 	set_lvb_unlock(r, lkb);
2009 	_remove_lock(r, lkb);
2010 }
2011 
2012 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014 	_remove_lock(r, lkb);
2015 }
2016 
2017 /* returns: 0 did nothing
2018 	    1 moved lock to granted
2019 	   -1 removed lock */
2020 
2021 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2022 {
2023 	int rv = 0;
2024 
2025 	lkb->lkb_rqmode = DLM_LOCK_IV;
2026 
2027 	switch (lkb->lkb_status) {
2028 	case DLM_LKSTS_GRANTED:
2029 		break;
2030 	case DLM_LKSTS_CONVERT:
2031 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2032 		rv = 1;
2033 		break;
2034 	case DLM_LKSTS_WAITING:
2035 		del_lkb(r, lkb);
2036 		lkb->lkb_grmode = DLM_LOCK_IV;
2037 		/* this unhold undoes the original ref from create_lkb()
2038 		   so this leads to the lkb being freed */
2039 		unhold_lkb(lkb);
2040 		rv = -1;
2041 		break;
2042 	default:
2043 		log_print("invalid status for revert %d", lkb->lkb_status);
2044 	}
2045 	return rv;
2046 }
2047 
2048 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2049 {
2050 	return revert_lock(r, lkb);
2051 }
2052 
2053 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2054 {
2055 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2056 		lkb->lkb_grmode = lkb->lkb_rqmode;
2057 		if (lkb->lkb_status)
2058 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2059 		else
2060 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2061 	}
2062 
2063 	lkb->lkb_rqmode = DLM_LOCK_IV;
2064 	lkb->lkb_highbast = 0;
2065 }
2066 
2067 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2068 {
2069 	set_lvb_lock(r, lkb);
2070 	_grant_lock(r, lkb);
2071 }
2072 
2073 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2074 			  const struct dlm_message *ms)
2075 {
2076 	set_lvb_lock_pc(r, lkb, ms);
2077 	_grant_lock(r, lkb);
2078 }
2079 
2080 /* called by grant_pending_locks() which means an async grant message must
2081    be sent to the requesting node in addition to granting the lock if the
2082    lkb belongs to a remote node. */
2083 
2084 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2085 {
2086 	grant_lock(r, lkb);
2087 	if (is_master_copy(lkb))
2088 		send_grant(r, lkb);
2089 	else
2090 		queue_cast(r, lkb, 0);
2091 }
2092 
2093 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2094    change the granted/requested modes.  We're munging things accordingly in
2095    the process copy.
2096    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2097    conversion deadlock
2098    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2099    compatible with other granted locks */
2100 
2101 static void munge_demoted(struct dlm_lkb *lkb)
2102 {
2103 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2104 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2105 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2106 		return;
2107 	}
2108 
2109 	lkb->lkb_grmode = DLM_LOCK_NL;
2110 }
2111 
2112 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2113 {
2114 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2115 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2116 		log_print("munge_altmode %x invalid reply type %d",
2117 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2118 		return;
2119 	}
2120 
2121 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2122 		lkb->lkb_rqmode = DLM_LOCK_PR;
2123 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2124 		lkb->lkb_rqmode = DLM_LOCK_CW;
2125 	else {
2126 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2127 		dlm_print_lkb(lkb);
2128 	}
2129 }
2130 
2131 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2132 {
2133 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2134 					   lkb_statequeue);
2135 	if (lkb->lkb_id == first->lkb_id)
2136 		return 1;
2137 
2138 	return 0;
2139 }
2140 
2141 /* Check if the given lkb conflicts with another lkb on the queue. */
2142 
2143 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2144 {
2145 	struct dlm_lkb *this;
2146 
2147 	list_for_each_entry(this, head, lkb_statequeue) {
2148 		if (this == lkb)
2149 			continue;
2150 		if (!modes_compat(this, lkb))
2151 			return 1;
2152 	}
2153 	return 0;
2154 }
2155 
2156 /*
2157  * "A conversion deadlock arises with a pair of lock requests in the converting
2158  * queue for one resource.  The granted mode of each lock blocks the requested
2159  * mode of the other lock."
2160  *
2161  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2162  * convert queue from being granted, then deadlk/demote lkb.
2163  *
2164  * Example:
2165  * Granted Queue: empty
2166  * Convert Queue: NL->EX (first lock)
2167  *                PR->EX (second lock)
2168  *
2169  * The first lock can't be granted because of the granted mode of the second
2170  * lock and the second lock can't be granted because it's not first in the
2171  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2172  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2173  * flag set and return DEMOTED in the lksb flags.
2174  *
2175  * Originally, this function detected conv-deadlk in a more limited scope:
2176  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2177  * - if lkb1 was the first entry in the queue (not just earlier), and was
2178  *   blocked by the granted mode of lkb2, and there was nothing on the
2179  *   granted queue preventing lkb1 from being granted immediately, i.e.
2180  *   lkb2 was the only thing preventing lkb1 from being granted.
2181  *
2182  * That second condition meant we'd only say there was conv-deadlk if
2183  * resolving it (by demotion) would lead to the first lock on the convert
2184  * queue being granted right away.  It allowed conversion deadlocks to exist
2185  * between locks on the convert queue while they couldn't be granted anyway.
2186  *
2187  * Now, we detect and take action on conversion deadlocks immediately when
2188  * they're created, even if they may not be immediately consequential.  If
2189  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2190  * mode that would prevent lkb1's conversion from being granted, we do a
2191  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2192  * I think this means that the lkb_is_ahead condition below should always
2193  * be zero, i.e. there will never be conv-deadlk between two locks that are
2194  * both already on the convert queue.
2195  */
2196 
2197 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2198 {
2199 	struct dlm_lkb *lkb1;
2200 	int lkb_is_ahead = 0;
2201 
2202 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2203 		if (lkb1 == lkb2) {
2204 			lkb_is_ahead = 1;
2205 			continue;
2206 		}
2207 
2208 		if (!lkb_is_ahead) {
2209 			if (!modes_compat(lkb2, lkb1))
2210 				return 1;
2211 		} else {
2212 			if (!modes_compat(lkb2, lkb1) &&
2213 			    !modes_compat(lkb1, lkb2))
2214 				return 1;
2215 		}
2216 	}
2217 	return 0;
2218 }
2219 
2220 /*
2221  * Return 1 if the lock can be granted, 0 otherwise.
2222  * Also detect and resolve conversion deadlocks.
2223  *
2224  * lkb is the lock to be granted
2225  *
2226  * now is 1 if the function is being called in the context of the
2227  * immediate request, it is 0 if called later, after the lock has been
2228  * queued.
2229  *
2230  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2231  * after recovery.
2232  *
2233  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2234  */
2235 
2236 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2237 			   int recover)
2238 {
2239 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2240 
2241 	/*
2242 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2243 	 * a new request for a NL mode lock being blocked.
2244 	 *
2245 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2246 	 * request, then it would be granted.  In essence, the use of this flag
2247 	 * tells the Lock Manager to expedite theis request by not considering
2248 	 * what may be in the CONVERTING or WAITING queues...  As of this
2249 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2250 	 * mode locks.  This flag is not valid for conversion requests.
2251 	 *
2252 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2253 	 * conversion or used with a non-NL requested mode.  We also know an
2254 	 * EXPEDITE request is always granted immediately, so now must always
2255 	 * be 1.  The full condition to grant an expedite request: (now &&
2256 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2257 	 * therefore be shortened to just checking the flag.
2258 	 */
2259 
2260 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2261 		return 1;
2262 
2263 	/*
2264 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2265 	 * added to the remaining conditions.
2266 	 */
2267 
2268 	if (queue_conflict(&r->res_grantqueue, lkb))
2269 		return 0;
2270 
2271 	/*
2272 	 * 6-3: By default, a conversion request is immediately granted if the
2273 	 * requested mode is compatible with the modes of all other granted
2274 	 * locks
2275 	 */
2276 
2277 	if (queue_conflict(&r->res_convertqueue, lkb))
2278 		return 0;
2279 
2280 	/*
2281 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2282 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2283 	 * The lkb's may have been rebuilt on the queues in a different
2284 	 * order than they were in on the previous master.  So, granting
2285 	 * queued conversions in order after recovery doesn't make sense
2286 	 * since the order hasn't been preserved anyway.  The new order
2287 	 * could also have created a new "in place" conversion deadlock.
2288 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2289 	 * After recovery, there would be no granted locks, and possibly
2290 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2291 	 * recovery, grant conversions without considering order.
2292 	 */
2293 
2294 	if (conv && recover)
2295 		return 1;
2296 
2297 	/*
2298 	 * 6-5: But the default algorithm for deciding whether to grant or
2299 	 * queue conversion requests does not by itself guarantee that such
2300 	 * requests are serviced on a "first come first serve" basis.  This, in
2301 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2302 	 *
2303 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2304 	 * the system service employed to request a lock conversion.  This flag
2305 	 * forces certain conversion requests to be queued, even if they are
2306 	 * compatible with the granted modes of other locks on the same
2307 	 * resource.  Thus, the use of this flag results in conversion requests
2308 	 * being ordered on a "first come first servce" basis.
2309 	 *
2310 	 * DCT: This condition is all about new conversions being able to occur
2311 	 * "in place" while the lock remains on the granted queue (assuming
2312 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2313 	 * doesn't _have_ to go onto the convert queue where it's processed in
2314 	 * order.  The "now" variable is necessary to distinguish converts
2315 	 * being received and processed for the first time now, because once a
2316 	 * convert is moved to the conversion queue the condition below applies
2317 	 * requiring fifo granting.
2318 	 */
2319 
2320 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2321 		return 1;
2322 
2323 	/*
2324 	 * Even if the convert is compat with all granted locks,
2325 	 * QUECVT forces it behind other locks on the convert queue.
2326 	 */
2327 
2328 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2329 		if (list_empty(&r->res_convertqueue))
2330 			return 1;
2331 		else
2332 			return 0;
2333 	}
2334 
2335 	/*
2336 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2337 	 * order.
2338 	 */
2339 
2340 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2341 		return 1;
2342 
2343 	/*
2344 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2345 	 * granted until all other conversion requests ahead of it are granted
2346 	 * and/or canceled.
2347 	 */
2348 
2349 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2350 		return 1;
2351 
2352 	/*
2353 	 * 6-4: By default, a new request is immediately granted only if all
2354 	 * three of the following conditions are satisfied when the request is
2355 	 * issued:
2356 	 * - The queue of ungranted conversion requests for the resource is
2357 	 *   empty.
2358 	 * - The queue of ungranted new requests for the resource is empty.
2359 	 * - The mode of the new request is compatible with the most
2360 	 *   restrictive mode of all granted locks on the resource.
2361 	 */
2362 
2363 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2364 	    list_empty(&r->res_waitqueue))
2365 		return 1;
2366 
2367 	/*
2368 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2369 	 * it cannot be granted until the queue of ungranted conversion
2370 	 * requests is empty, all ungranted new requests ahead of it are
2371 	 * granted and/or canceled, and it is compatible with the granted mode
2372 	 * of the most restrictive lock granted on the resource.
2373 	 */
2374 
2375 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2376 	    first_in_list(lkb, &r->res_waitqueue))
2377 		return 1;
2378 
2379 	return 0;
2380 }
2381 
2382 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2383 			  int recover, int *err)
2384 {
2385 	int rv;
2386 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2387 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2388 
2389 	if (err)
2390 		*err = 0;
2391 
2392 	rv = _can_be_granted(r, lkb, now, recover);
2393 	if (rv)
2394 		goto out;
2395 
2396 	/*
2397 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2398 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2399 	 * cancels one of the locks.
2400 	 */
2401 
2402 	if (is_convert && can_be_queued(lkb) &&
2403 	    conversion_deadlock_detect(r, lkb)) {
2404 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2405 			lkb->lkb_grmode = DLM_LOCK_NL;
2406 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2407 		} else if (err) {
2408 			*err = -EDEADLK;
2409 		} else {
2410 			log_print("can_be_granted deadlock %x now %d",
2411 				  lkb->lkb_id, now);
2412 			dlm_dump_rsb(r);
2413 		}
2414 		goto out;
2415 	}
2416 
2417 	/*
2418 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2419 	 * to grant a request in a mode other than the normal rqmode.  It's a
2420 	 * simple way to provide a big optimization to applications that can
2421 	 * use them.
2422 	 */
2423 
2424 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2425 		alt = DLM_LOCK_PR;
2426 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2427 		alt = DLM_LOCK_CW;
2428 
2429 	if (alt) {
2430 		lkb->lkb_rqmode = alt;
2431 		rv = _can_be_granted(r, lkb, now, 0);
2432 		if (rv)
2433 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2434 		else
2435 			lkb->lkb_rqmode = rqmode;
2436 	}
2437  out:
2438 	return rv;
2439 }
2440 
2441 /* Returns the highest requested mode of all blocked conversions; sets
2442    cw if there's a blocked conversion to DLM_LOCK_CW. */
2443 
2444 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2445 				 unsigned int *count)
2446 {
2447 	struct dlm_lkb *lkb, *s;
2448 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2449 	int hi, demoted, quit, grant_restart, demote_restart;
2450 	int deadlk;
2451 
2452 	quit = 0;
2453  restart:
2454 	grant_restart = 0;
2455 	demote_restart = 0;
2456 	hi = DLM_LOCK_IV;
2457 
2458 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2459 		demoted = is_demoted(lkb);
2460 		deadlk = 0;
2461 
2462 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2463 			grant_lock_pending(r, lkb);
2464 			grant_restart = 1;
2465 			if (count)
2466 				(*count)++;
2467 			continue;
2468 		}
2469 
2470 		if (!demoted && is_demoted(lkb)) {
2471 			log_print("WARN: pending demoted %x node %d %s",
2472 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2473 			demote_restart = 1;
2474 			continue;
2475 		}
2476 
2477 		if (deadlk) {
2478 			/*
2479 			 * If DLM_LKB_NODLKWT flag is set and conversion
2480 			 * deadlock is detected, we request blocking AST and
2481 			 * down (or cancel) conversion.
2482 			 */
2483 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2484 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2485 					queue_bast(r, lkb, lkb->lkb_rqmode);
2486 					lkb->lkb_highbast = lkb->lkb_rqmode;
2487 				}
2488 			} else {
2489 				log_print("WARN: pending deadlock %x node %d %s",
2490 					  lkb->lkb_id, lkb->lkb_nodeid,
2491 					  r->res_name);
2492 				dlm_dump_rsb(r);
2493 			}
2494 			continue;
2495 		}
2496 
2497 		hi = max_t(int, lkb->lkb_rqmode, hi);
2498 
2499 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2500 			*cw = 1;
2501 	}
2502 
2503 	if (grant_restart)
2504 		goto restart;
2505 	if (demote_restart && !quit) {
2506 		quit = 1;
2507 		goto restart;
2508 	}
2509 
2510 	return max_t(int, high, hi);
2511 }
2512 
2513 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2514 			      unsigned int *count)
2515 {
2516 	struct dlm_lkb *lkb, *s;
2517 
2518 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2519 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2520 			grant_lock_pending(r, lkb);
2521 			if (count)
2522 				(*count)++;
2523 		} else {
2524 			high = max_t(int, lkb->lkb_rqmode, high);
2525 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2526 				*cw = 1;
2527 		}
2528 	}
2529 
2530 	return high;
2531 }
2532 
2533 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2534    on either the convert or waiting queue.
2535    high is the largest rqmode of all locks blocked on the convert or
2536    waiting queue. */
2537 
2538 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2539 {
2540 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2541 		if (gr->lkb_highbast < DLM_LOCK_EX)
2542 			return 1;
2543 		return 0;
2544 	}
2545 
2546 	if (gr->lkb_highbast < high &&
2547 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2548 		return 1;
2549 	return 0;
2550 }
2551 
2552 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2553 {
2554 	struct dlm_lkb *lkb, *s;
2555 	int high = DLM_LOCK_IV;
2556 	int cw = 0;
2557 
2558 	if (!is_master(r)) {
2559 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2560 		dlm_dump_rsb(r);
2561 		return;
2562 	}
2563 
2564 	high = grant_pending_convert(r, high, &cw, count);
2565 	high = grant_pending_wait(r, high, &cw, count);
2566 
2567 	if (high == DLM_LOCK_IV)
2568 		return;
2569 
2570 	/*
2571 	 * If there are locks left on the wait/convert queue then send blocking
2572 	 * ASTs to granted locks based on the largest requested mode (high)
2573 	 * found above.
2574 	 */
2575 
2576 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2577 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2578 			if (cw && high == DLM_LOCK_PR &&
2579 			    lkb->lkb_grmode == DLM_LOCK_PR)
2580 				queue_bast(r, lkb, DLM_LOCK_CW);
2581 			else
2582 				queue_bast(r, lkb, high);
2583 			lkb->lkb_highbast = high;
2584 		}
2585 	}
2586 }
2587 
2588 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2589 {
2590 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2591 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2592 		if (gr->lkb_highbast < DLM_LOCK_EX)
2593 			return 1;
2594 		return 0;
2595 	}
2596 
2597 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2598 		return 1;
2599 	return 0;
2600 }
2601 
2602 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2603 			    struct dlm_lkb *lkb)
2604 {
2605 	struct dlm_lkb *gr;
2606 
2607 	list_for_each_entry(gr, head, lkb_statequeue) {
2608 		/* skip self when sending basts to convertqueue */
2609 		if (gr == lkb)
2610 			continue;
2611 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2612 			queue_bast(r, gr, lkb->lkb_rqmode);
2613 			gr->lkb_highbast = lkb->lkb_rqmode;
2614 		}
2615 	}
2616 }
2617 
2618 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2619 {
2620 	send_bast_queue(r, &r->res_grantqueue, lkb);
2621 }
2622 
2623 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2624 {
2625 	send_bast_queue(r, &r->res_grantqueue, lkb);
2626 	send_bast_queue(r, &r->res_convertqueue, lkb);
2627 }
2628 
2629 /* set_master(r, lkb) -- set the master nodeid of a resource
2630 
2631    The purpose of this function is to set the nodeid field in the given
2632    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2633    known, it can just be copied to the lkb and the function will return
2634    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2635    before it can be copied to the lkb.
2636 
2637    When the rsb nodeid is being looked up remotely, the initial lkb
2638    causing the lookup is kept on the ls_waiters list waiting for the
2639    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2640    on the rsb's res_lookup list until the master is verified.
2641 
2642    Return values:
2643    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2644    1: the rsb master is not available and the lkb has been placed on
2645       a wait queue
2646 */
2647 
2648 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2649 {
2650 	int our_nodeid = dlm_our_nodeid();
2651 
2652 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2653 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2654 		r->res_first_lkid = lkb->lkb_id;
2655 		lkb->lkb_nodeid = r->res_nodeid;
2656 		return 0;
2657 	}
2658 
2659 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2660 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2661 		return 1;
2662 	}
2663 
2664 	if (r->res_master_nodeid == our_nodeid) {
2665 		lkb->lkb_nodeid = 0;
2666 		return 0;
2667 	}
2668 
2669 	if (r->res_master_nodeid) {
2670 		lkb->lkb_nodeid = r->res_master_nodeid;
2671 		return 0;
2672 	}
2673 
2674 	if (dlm_dir_nodeid(r) == our_nodeid) {
2675 		/* This is a somewhat unusual case; find_rsb will usually
2676 		   have set res_master_nodeid when dir nodeid is local, but
2677 		   there are cases where we become the dir node after we've
2678 		   past find_rsb and go through _request_lock again.
2679 		   confirm_master() or process_lookup_list() needs to be
2680 		   called after this. */
2681 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2682 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2683 			  r->res_name);
2684 		r->res_master_nodeid = our_nodeid;
2685 		r->res_nodeid = 0;
2686 		lkb->lkb_nodeid = 0;
2687 		return 0;
2688 	}
2689 
2690 	r->res_first_lkid = lkb->lkb_id;
2691 	send_lookup(r, lkb);
2692 	return 1;
2693 }
2694 
2695 static void process_lookup_list(struct dlm_rsb *r)
2696 {
2697 	struct dlm_lkb *lkb, *safe;
2698 
2699 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2700 		list_del_init(&lkb->lkb_rsb_lookup);
2701 		_request_lock(r, lkb);
2702 	}
2703 }
2704 
2705 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2706 
2707 static void confirm_master(struct dlm_rsb *r, int error)
2708 {
2709 	struct dlm_lkb *lkb;
2710 
2711 	if (!r->res_first_lkid)
2712 		return;
2713 
2714 	switch (error) {
2715 	case 0:
2716 	case -EINPROGRESS:
2717 		r->res_first_lkid = 0;
2718 		process_lookup_list(r);
2719 		break;
2720 
2721 	case -EAGAIN:
2722 	case -EBADR:
2723 	case -ENOTBLK:
2724 		/* the remote request failed and won't be retried (it was
2725 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2726 		   lkb the first_lkid */
2727 
2728 		r->res_first_lkid = 0;
2729 
2730 		if (!list_empty(&r->res_lookup)) {
2731 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2732 					 lkb_rsb_lookup);
2733 			list_del_init(&lkb->lkb_rsb_lookup);
2734 			r->res_first_lkid = lkb->lkb_id;
2735 			_request_lock(r, lkb);
2736 		}
2737 		break;
2738 
2739 	default:
2740 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2741 	}
2742 }
2743 
2744 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2745 			 int namelen, void (*ast)(void *astparam),
2746 			 void *astparam,
2747 			 void (*bast)(void *astparam, int mode),
2748 			 struct dlm_args *args)
2749 {
2750 	int rv = -EINVAL;
2751 
2752 	/* check for invalid arg usage */
2753 
2754 	if (mode < 0 || mode > DLM_LOCK_EX)
2755 		goto out;
2756 
2757 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2758 		goto out;
2759 
2760 	if (flags & DLM_LKF_CANCEL)
2761 		goto out;
2762 
2763 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2764 		goto out;
2765 
2766 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2767 		goto out;
2768 
2769 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2770 		goto out;
2771 
2772 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2773 		goto out;
2774 
2775 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2776 		goto out;
2777 
2778 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2779 		goto out;
2780 
2781 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2782 		goto out;
2783 
2784 	if (!ast || !lksb)
2785 		goto out;
2786 
2787 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2788 		goto out;
2789 
2790 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2791 		goto out;
2792 
2793 	/* these args will be copied to the lkb in validate_lock_args,
2794 	   it cannot be done now because when converting locks, fields in
2795 	   an active lkb cannot be modified before locking the rsb */
2796 
2797 	args->flags = flags;
2798 	args->astfn = ast;
2799 	args->astparam = astparam;
2800 	args->bastfn = bast;
2801 	args->mode = mode;
2802 	args->lksb = lksb;
2803 	rv = 0;
2804  out:
2805 	return rv;
2806 }
2807 
2808 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2809 {
2810 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2811  		      DLM_LKF_FORCEUNLOCK))
2812 		return -EINVAL;
2813 
2814 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2815 		return -EINVAL;
2816 
2817 	args->flags = flags;
2818 	args->astparam = astarg;
2819 	return 0;
2820 }
2821 
2822 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2823 			      struct dlm_args *args)
2824 {
2825 	int rv = -EBUSY;
2826 
2827 	if (args->flags & DLM_LKF_CONVERT) {
2828 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2829 			goto out;
2830 
2831 		/* lock not allowed if there's any op in progress */
2832 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2833 			goto out;
2834 
2835 		if (is_overlap(lkb))
2836 			goto out;
2837 
2838 		rv = -EINVAL;
2839 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2840 			goto out;
2841 
2842 		if (args->flags & DLM_LKF_QUECVT &&
2843 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2844 			goto out;
2845 	}
2846 
2847 	lkb->lkb_exflags = args->flags;
2848 	dlm_set_sbflags_val(lkb, 0);
2849 	lkb->lkb_astfn = args->astfn;
2850 	lkb->lkb_astparam = args->astparam;
2851 	lkb->lkb_bastfn = args->bastfn;
2852 	lkb->lkb_rqmode = args->mode;
2853 	lkb->lkb_lksb = args->lksb;
2854 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2855 	lkb->lkb_ownpid = (int) current->pid;
2856 	rv = 0;
2857  out:
2858 	switch (rv) {
2859 	case 0:
2860 		break;
2861 	case -EINVAL:
2862 		/* annoy the user because dlm usage is wrong */
2863 		WARN_ON(1);
2864 		log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2865 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2866 			  lkb->lkb_status, lkb->lkb_wait_type,
2867 			  lkb->lkb_resource->res_name);
2868 		break;
2869 	default:
2870 		log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2871 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2872 			  lkb->lkb_status, lkb->lkb_wait_type,
2873 			  lkb->lkb_resource->res_name);
2874 		break;
2875 	}
2876 
2877 	return rv;
2878 }
2879 
2880 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2881    for success */
2882 
2883 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2884    because there may be a lookup in progress and it's valid to do
2885    cancel/unlockf on it */
2886 
2887 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2888 {
2889 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2890 	int rv = -EBUSY;
2891 
2892 	/* normal unlock not allowed if there's any op in progress */
2893 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2894 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2895 		goto out;
2896 
2897 	/* an lkb may be waiting for an rsb lookup to complete where the
2898 	   lookup was initiated by another lock */
2899 
2900 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2901 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2902 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2903 			list_del_init(&lkb->lkb_rsb_lookup);
2904 			queue_cast(lkb->lkb_resource, lkb,
2905 				   args->flags & DLM_LKF_CANCEL ?
2906 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2907 			unhold_lkb(lkb); /* undoes create_lkb() */
2908 		}
2909 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2910 		goto out;
2911 	}
2912 
2913 	rv = -EINVAL;
2914 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2915 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2916 		dlm_print_lkb(lkb);
2917 		goto out;
2918 	}
2919 
2920 	/* an lkb may still exist even though the lock is EOL'ed due to a
2921 	 * cancel, unlock or failed noqueue request; an app can't use these
2922 	 * locks; return same error as if the lkid had not been found at all
2923 	 */
2924 
2925 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2926 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2927 		rv = -ENOENT;
2928 		goto out;
2929 	}
2930 
2931 	/* cancel not allowed with another cancel/unlock in progress */
2932 
2933 	if (args->flags & DLM_LKF_CANCEL) {
2934 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2935 			goto out;
2936 
2937 		if (is_overlap(lkb))
2938 			goto out;
2939 
2940 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2941 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2942 			rv = -EBUSY;
2943 			goto out;
2944 		}
2945 
2946 		/* there's nothing to cancel */
2947 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2948 		    !lkb->lkb_wait_type) {
2949 			rv = -EBUSY;
2950 			goto out;
2951 		}
2952 
2953 		switch (lkb->lkb_wait_type) {
2954 		case DLM_MSG_LOOKUP:
2955 		case DLM_MSG_REQUEST:
2956 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2957 			rv = -EBUSY;
2958 			goto out;
2959 		case DLM_MSG_UNLOCK:
2960 		case DLM_MSG_CANCEL:
2961 			goto out;
2962 		}
2963 		/* add_to_waiters() will set OVERLAP_CANCEL */
2964 		goto out_ok;
2965 	}
2966 
2967 	/* do we need to allow a force-unlock if there's a normal unlock
2968 	   already in progress?  in what conditions could the normal unlock
2969 	   fail such that we'd want to send a force-unlock to be sure? */
2970 
2971 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2972 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2973 			goto out;
2974 
2975 		if (is_overlap_unlock(lkb))
2976 			goto out;
2977 
2978 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2979 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2980 			rv = -EBUSY;
2981 			goto out;
2982 		}
2983 
2984 		switch (lkb->lkb_wait_type) {
2985 		case DLM_MSG_LOOKUP:
2986 		case DLM_MSG_REQUEST:
2987 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2988 			rv = -EBUSY;
2989 			goto out;
2990 		case DLM_MSG_UNLOCK:
2991 			goto out;
2992 		}
2993 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2994 	}
2995 
2996  out_ok:
2997 	/* an overlapping op shouldn't blow away exflags from other op */
2998 	lkb->lkb_exflags |= args->flags;
2999 	dlm_set_sbflags_val(lkb, 0);
3000 	lkb->lkb_astparam = args->astparam;
3001 	rv = 0;
3002  out:
3003 	switch (rv) {
3004 	case 0:
3005 		break;
3006 	case -EINVAL:
3007 		/* annoy the user because dlm usage is wrong */
3008 		WARN_ON(1);
3009 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3010 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3011 			  args->flags, lkb->lkb_wait_type,
3012 			  lkb->lkb_resource->res_name);
3013 		break;
3014 	default:
3015 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3016 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3017 			  args->flags, lkb->lkb_wait_type,
3018 			  lkb->lkb_resource->res_name);
3019 		break;
3020 	}
3021 
3022 	return rv;
3023 }
3024 
3025 /*
3026  * Four stage 4 varieties:
3027  * do_request(), do_convert(), do_unlock(), do_cancel()
3028  * These are called on the master node for the given lock and
3029  * from the central locking logic.
3030  */
3031 
3032 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3033 {
3034 	int error = 0;
3035 
3036 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3037 		grant_lock(r, lkb);
3038 		queue_cast(r, lkb, 0);
3039 		goto out;
3040 	}
3041 
3042 	if (can_be_queued(lkb)) {
3043 		error = -EINPROGRESS;
3044 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3045 		goto out;
3046 	}
3047 
3048 	error = -EAGAIN;
3049 	queue_cast(r, lkb, -EAGAIN);
3050  out:
3051 	return error;
3052 }
3053 
3054 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3055 			       int error)
3056 {
3057 	switch (error) {
3058 	case -EAGAIN:
3059 		if (force_blocking_asts(lkb))
3060 			send_blocking_asts_all(r, lkb);
3061 		break;
3062 	case -EINPROGRESS:
3063 		send_blocking_asts(r, lkb);
3064 		break;
3065 	}
3066 }
3067 
3068 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3069 {
3070 	int error = 0;
3071 	int deadlk = 0;
3072 
3073 	/* changing an existing lock may allow others to be granted */
3074 
3075 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3076 		grant_lock(r, lkb);
3077 		queue_cast(r, lkb, 0);
3078 		goto out;
3079 	}
3080 
3081 	/* can_be_granted() detected that this lock would block in a conversion
3082 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3083 	   the ast for the convert. */
3084 
3085 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3086 		/* it's left on the granted queue */
3087 		revert_lock(r, lkb);
3088 		queue_cast(r, lkb, -EDEADLK);
3089 		error = -EDEADLK;
3090 		goto out;
3091 	}
3092 
3093 	/* is_demoted() means the can_be_granted() above set the grmode
3094 	   to NL, and left us on the granted queue.  This auto-demotion
3095 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3096 	   now grantable.  We have to try to grant other converting locks
3097 	   before we try again to grant this one. */
3098 
3099 	if (is_demoted(lkb)) {
3100 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3101 		if (_can_be_granted(r, lkb, 1, 0)) {
3102 			grant_lock(r, lkb);
3103 			queue_cast(r, lkb, 0);
3104 			goto out;
3105 		}
3106 		/* else fall through and move to convert queue */
3107 	}
3108 
3109 	if (can_be_queued(lkb)) {
3110 		error = -EINPROGRESS;
3111 		del_lkb(r, lkb);
3112 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3113 		goto out;
3114 	}
3115 
3116 	error = -EAGAIN;
3117 	queue_cast(r, lkb, -EAGAIN);
3118  out:
3119 	return error;
3120 }
3121 
3122 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3123 			       int error)
3124 {
3125 	switch (error) {
3126 	case 0:
3127 		grant_pending_locks(r, NULL);
3128 		/* grant_pending_locks also sends basts */
3129 		break;
3130 	case -EAGAIN:
3131 		if (force_blocking_asts(lkb))
3132 			send_blocking_asts_all(r, lkb);
3133 		break;
3134 	case -EINPROGRESS:
3135 		send_blocking_asts(r, lkb);
3136 		break;
3137 	}
3138 }
3139 
3140 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3141 {
3142 	remove_lock(r, lkb);
3143 	queue_cast(r, lkb, -DLM_EUNLOCK);
3144 	return -DLM_EUNLOCK;
3145 }
3146 
3147 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3148 			      int error)
3149 {
3150 	grant_pending_locks(r, NULL);
3151 }
3152 
3153 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3154 
3155 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3156 {
3157 	int error;
3158 
3159 	error = revert_lock(r, lkb);
3160 	if (error) {
3161 		queue_cast(r, lkb, -DLM_ECANCEL);
3162 		return -DLM_ECANCEL;
3163 	}
3164 	return 0;
3165 }
3166 
3167 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3168 			      int error)
3169 {
3170 	if (error)
3171 		grant_pending_locks(r, NULL);
3172 }
3173 
3174 /*
3175  * Four stage 3 varieties:
3176  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3177  */
3178 
3179 /* add a new lkb to a possibly new rsb, called by requesting process */
3180 
3181 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3182 {
3183 	int error;
3184 
3185 	/* set_master: sets lkb nodeid from r */
3186 
3187 	error = set_master(r, lkb);
3188 	if (error < 0)
3189 		goto out;
3190 	if (error) {
3191 		error = 0;
3192 		goto out;
3193 	}
3194 
3195 	if (is_remote(r)) {
3196 		/* receive_request() calls do_request() on remote node */
3197 		error = send_request(r, lkb);
3198 	} else {
3199 		error = do_request(r, lkb);
3200 		/* for remote locks the request_reply is sent
3201 		   between do_request and do_request_effects */
3202 		do_request_effects(r, lkb, error);
3203 	}
3204  out:
3205 	return error;
3206 }
3207 
3208 /* change some property of an existing lkb, e.g. mode */
3209 
3210 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3211 {
3212 	int error;
3213 
3214 	if (is_remote(r)) {
3215 		/* receive_convert() calls do_convert() on remote node */
3216 		error = send_convert(r, lkb);
3217 	} else {
3218 		error = do_convert(r, lkb);
3219 		/* for remote locks the convert_reply is sent
3220 		   between do_convert and do_convert_effects */
3221 		do_convert_effects(r, lkb, error);
3222 	}
3223 
3224 	return error;
3225 }
3226 
3227 /* remove an existing lkb from the granted queue */
3228 
3229 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3230 {
3231 	int error;
3232 
3233 	if (is_remote(r)) {
3234 		/* receive_unlock() calls do_unlock() on remote node */
3235 		error = send_unlock(r, lkb);
3236 	} else {
3237 		error = do_unlock(r, lkb);
3238 		/* for remote locks the unlock_reply is sent
3239 		   between do_unlock and do_unlock_effects */
3240 		do_unlock_effects(r, lkb, error);
3241 	}
3242 
3243 	return error;
3244 }
3245 
3246 /* remove an existing lkb from the convert or wait queue */
3247 
3248 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3249 {
3250 	int error;
3251 
3252 	if (is_remote(r)) {
3253 		/* receive_cancel() calls do_cancel() on remote node */
3254 		error = send_cancel(r, lkb);
3255 	} else {
3256 		error = do_cancel(r, lkb);
3257 		/* for remote locks the cancel_reply is sent
3258 		   between do_cancel and do_cancel_effects */
3259 		do_cancel_effects(r, lkb, error);
3260 	}
3261 
3262 	return error;
3263 }
3264 
3265 /*
3266  * Four stage 2 varieties:
3267  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3268  */
3269 
3270 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3271 			const void *name, int len,
3272 			struct dlm_args *args)
3273 {
3274 	struct dlm_rsb *r;
3275 	int error;
3276 
3277 	error = validate_lock_args(ls, lkb, args);
3278 	if (error)
3279 		return error;
3280 
3281 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3282 	if (error)
3283 		return error;
3284 
3285 	lock_rsb(r);
3286 
3287 	attach_lkb(r, lkb);
3288 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3289 
3290 	error = _request_lock(r, lkb);
3291 
3292 	unlock_rsb(r);
3293 	put_rsb(r);
3294 	return error;
3295 }
3296 
3297 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3298 			struct dlm_args *args)
3299 {
3300 	struct dlm_rsb *r;
3301 	int error;
3302 
3303 	r = lkb->lkb_resource;
3304 
3305 	hold_rsb(r);
3306 	lock_rsb(r);
3307 
3308 	error = validate_lock_args(ls, lkb, args);
3309 	if (error)
3310 		goto out;
3311 
3312 	error = _convert_lock(r, lkb);
3313  out:
3314 	unlock_rsb(r);
3315 	put_rsb(r);
3316 	return error;
3317 }
3318 
3319 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3320 		       struct dlm_args *args)
3321 {
3322 	struct dlm_rsb *r;
3323 	int error;
3324 
3325 	r = lkb->lkb_resource;
3326 
3327 	hold_rsb(r);
3328 	lock_rsb(r);
3329 
3330 	error = validate_unlock_args(lkb, args);
3331 	if (error)
3332 		goto out;
3333 
3334 	error = _unlock_lock(r, lkb);
3335  out:
3336 	unlock_rsb(r);
3337 	put_rsb(r);
3338 	return error;
3339 }
3340 
3341 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3342 		       struct dlm_args *args)
3343 {
3344 	struct dlm_rsb *r;
3345 	int error;
3346 
3347 	r = lkb->lkb_resource;
3348 
3349 	hold_rsb(r);
3350 	lock_rsb(r);
3351 
3352 	error = validate_unlock_args(lkb, args);
3353 	if (error)
3354 		goto out;
3355 
3356 	error = _cancel_lock(r, lkb);
3357  out:
3358 	unlock_rsb(r);
3359 	put_rsb(r);
3360 	return error;
3361 }
3362 
3363 /*
3364  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3365  */
3366 
3367 int dlm_lock(dlm_lockspace_t *lockspace,
3368 	     int mode,
3369 	     struct dlm_lksb *lksb,
3370 	     uint32_t flags,
3371 	     const void *name,
3372 	     unsigned int namelen,
3373 	     uint32_t parent_lkid,
3374 	     void (*ast) (void *astarg),
3375 	     void *astarg,
3376 	     void (*bast) (void *astarg, int mode))
3377 {
3378 	struct dlm_ls *ls;
3379 	struct dlm_lkb *lkb;
3380 	struct dlm_args args;
3381 	int error, convert = flags & DLM_LKF_CONVERT;
3382 
3383 	ls = dlm_find_lockspace_local(lockspace);
3384 	if (!ls)
3385 		return -EINVAL;
3386 
3387 	dlm_lock_recovery(ls);
3388 
3389 	if (convert)
3390 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3391 	else
3392 		error = create_lkb(ls, &lkb);
3393 
3394 	if (error)
3395 		goto out;
3396 
3397 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3398 
3399 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3400 			      &args);
3401 	if (error)
3402 		goto out_put;
3403 
3404 	if (convert)
3405 		error = convert_lock(ls, lkb, &args);
3406 	else
3407 		error = request_lock(ls, lkb, name, namelen, &args);
3408 
3409 	if (error == -EINPROGRESS)
3410 		error = 0;
3411  out_put:
3412 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3413 
3414 	if (convert || error)
3415 		__put_lkb(ls, lkb);
3416 	if (error == -EAGAIN || error == -EDEADLK)
3417 		error = 0;
3418  out:
3419 	dlm_unlock_recovery(ls);
3420 	dlm_put_lockspace(ls);
3421 	return error;
3422 }
3423 
3424 int dlm_unlock(dlm_lockspace_t *lockspace,
3425 	       uint32_t lkid,
3426 	       uint32_t flags,
3427 	       struct dlm_lksb *lksb,
3428 	       void *astarg)
3429 {
3430 	struct dlm_ls *ls;
3431 	struct dlm_lkb *lkb;
3432 	struct dlm_args args;
3433 	int error;
3434 
3435 	ls = dlm_find_lockspace_local(lockspace);
3436 	if (!ls)
3437 		return -EINVAL;
3438 
3439 	dlm_lock_recovery(ls);
3440 
3441 	error = find_lkb(ls, lkid, &lkb);
3442 	if (error)
3443 		goto out;
3444 
3445 	trace_dlm_unlock_start(ls, lkb, flags);
3446 
3447 	error = set_unlock_args(flags, astarg, &args);
3448 	if (error)
3449 		goto out_put;
3450 
3451 	if (flags & DLM_LKF_CANCEL)
3452 		error = cancel_lock(ls, lkb, &args);
3453 	else
3454 		error = unlock_lock(ls, lkb, &args);
3455 
3456 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3457 		error = 0;
3458 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3459 		error = 0;
3460  out_put:
3461 	trace_dlm_unlock_end(ls, lkb, flags, error);
3462 
3463 	dlm_put_lkb(lkb);
3464  out:
3465 	dlm_unlock_recovery(ls);
3466 	dlm_put_lockspace(ls);
3467 	return error;
3468 }
3469 
3470 /*
3471  * send/receive routines for remote operations and replies
3472  *
3473  * send_args
3474  * send_common
3475  * send_request			receive_request
3476  * send_convert			receive_convert
3477  * send_unlock			receive_unlock
3478  * send_cancel			receive_cancel
3479  * send_grant			receive_grant
3480  * send_bast			receive_bast
3481  * send_lookup			receive_lookup
3482  * send_remove			receive_remove
3483  *
3484  * 				send_common_reply
3485  * receive_request_reply	send_request_reply
3486  * receive_convert_reply	send_convert_reply
3487  * receive_unlock_reply		send_unlock_reply
3488  * receive_cancel_reply		send_cancel_reply
3489  * receive_lookup_reply		send_lookup_reply
3490  */
3491 
3492 static int _create_message(struct dlm_ls *ls, int mb_len,
3493 			   int to_nodeid, int mstype,
3494 			   struct dlm_message **ms_ret,
3495 			   struct dlm_mhandle **mh_ret)
3496 {
3497 	struct dlm_message *ms;
3498 	struct dlm_mhandle *mh;
3499 	char *mb;
3500 
3501 	/* get_buffer gives us a message handle (mh) that we need to
3502 	   pass into midcomms_commit and a message buffer (mb) that we
3503 	   write our data into */
3504 
3505 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3506 	if (!mh)
3507 		return -ENOBUFS;
3508 
3509 	ms = (struct dlm_message *) mb;
3510 
3511 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3512 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3513 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3514 	ms->m_header.h_length = cpu_to_le16(mb_len);
3515 	ms->m_header.h_cmd = DLM_MSG;
3516 
3517 	ms->m_type = cpu_to_le32(mstype);
3518 
3519 	*mh_ret = mh;
3520 	*ms_ret = ms;
3521 	return 0;
3522 }
3523 
3524 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3525 			  int to_nodeid, int mstype,
3526 			  struct dlm_message **ms_ret,
3527 			  struct dlm_mhandle **mh_ret)
3528 {
3529 	int mb_len = sizeof(struct dlm_message);
3530 
3531 	switch (mstype) {
3532 	case DLM_MSG_REQUEST:
3533 	case DLM_MSG_LOOKUP:
3534 	case DLM_MSG_REMOVE:
3535 		mb_len += r->res_length;
3536 		break;
3537 	case DLM_MSG_CONVERT:
3538 	case DLM_MSG_UNLOCK:
3539 	case DLM_MSG_REQUEST_REPLY:
3540 	case DLM_MSG_CONVERT_REPLY:
3541 	case DLM_MSG_GRANT:
3542 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3543 			mb_len += r->res_ls->ls_lvblen;
3544 		break;
3545 	}
3546 
3547 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3548 			       ms_ret, mh_ret);
3549 }
3550 
3551 /* further lowcomms enhancements or alternate implementations may make
3552    the return value from this function useful at some point */
3553 
3554 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3555 			const void *name, int namelen)
3556 {
3557 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3558 	return 0;
3559 }
3560 
3561 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3562 		      struct dlm_message *ms)
3563 {
3564 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3565 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3566 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3567 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3568 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3569 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3570 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3571 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3572 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3573 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3574 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3575 	ms->m_hash     = cpu_to_le32(r->res_hash);
3576 
3577 	/* m_result and m_bastmode are set from function args,
3578 	   not from lkb fields */
3579 
3580 	if (lkb->lkb_bastfn)
3581 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3582 	if (lkb->lkb_astfn)
3583 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3584 
3585 	/* compare with switch in create_message; send_remove() doesn't
3586 	   use send_args() */
3587 
3588 	switch (ms->m_type) {
3589 	case cpu_to_le32(DLM_MSG_REQUEST):
3590 	case cpu_to_le32(DLM_MSG_LOOKUP):
3591 		memcpy(ms->m_extra, r->res_name, r->res_length);
3592 		break;
3593 	case cpu_to_le32(DLM_MSG_CONVERT):
3594 	case cpu_to_le32(DLM_MSG_UNLOCK):
3595 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3596 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3597 	case cpu_to_le32(DLM_MSG_GRANT):
3598 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3599 			break;
3600 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3601 		break;
3602 	}
3603 }
3604 
3605 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3606 {
3607 	struct dlm_message *ms;
3608 	struct dlm_mhandle *mh;
3609 	int to_nodeid, error;
3610 
3611 	to_nodeid = r->res_nodeid;
3612 
3613 	error = add_to_waiters(lkb, mstype, to_nodeid);
3614 	if (error)
3615 		return error;
3616 
3617 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3618 	if (error)
3619 		goto fail;
3620 
3621 	send_args(r, lkb, ms);
3622 
3623 	error = send_message(mh, ms, r->res_name, r->res_length);
3624 	if (error)
3625 		goto fail;
3626 	return 0;
3627 
3628  fail:
3629 	remove_from_waiters(lkb, msg_reply_type(mstype));
3630 	return error;
3631 }
3632 
3633 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3634 {
3635 	return send_common(r, lkb, DLM_MSG_REQUEST);
3636 }
3637 
3638 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3639 {
3640 	int error;
3641 
3642 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3643 
3644 	/* down conversions go without a reply from the master */
3645 	if (!error && down_conversion(lkb)) {
3646 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3647 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3648 		r->res_ls->ls_local_ms.m_result = 0;
3649 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3650 	}
3651 
3652 	return error;
3653 }
3654 
3655 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3656    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3657    that the master is still correct. */
3658 
3659 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3662 }
3663 
3664 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666 	return send_common(r, lkb, DLM_MSG_CANCEL);
3667 }
3668 
3669 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 {
3671 	struct dlm_message *ms;
3672 	struct dlm_mhandle *mh;
3673 	int to_nodeid, error;
3674 
3675 	to_nodeid = lkb->lkb_nodeid;
3676 
3677 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3678 	if (error)
3679 		goto out;
3680 
3681 	send_args(r, lkb, ms);
3682 
3683 	ms->m_result = 0;
3684 
3685 	error = send_message(mh, ms, r->res_name, r->res_length);
3686  out:
3687 	return error;
3688 }
3689 
3690 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3691 {
3692 	struct dlm_message *ms;
3693 	struct dlm_mhandle *mh;
3694 	int to_nodeid, error;
3695 
3696 	to_nodeid = lkb->lkb_nodeid;
3697 
3698 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3699 	if (error)
3700 		goto out;
3701 
3702 	send_args(r, lkb, ms);
3703 
3704 	ms->m_bastmode = cpu_to_le32(mode);
3705 
3706 	error = send_message(mh, ms, r->res_name, r->res_length);
3707  out:
3708 	return error;
3709 }
3710 
3711 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3712 {
3713 	struct dlm_message *ms;
3714 	struct dlm_mhandle *mh;
3715 	int to_nodeid, error;
3716 
3717 	to_nodeid = dlm_dir_nodeid(r);
3718 
3719 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3720 	if (error)
3721 		return error;
3722 
3723 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3724 	if (error)
3725 		goto fail;
3726 
3727 	send_args(r, lkb, ms);
3728 
3729 	error = send_message(mh, ms, r->res_name, r->res_length);
3730 	if (error)
3731 		goto fail;
3732 	return 0;
3733 
3734  fail:
3735 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3736 	return error;
3737 }
3738 
3739 static int send_remove(struct dlm_rsb *r)
3740 {
3741 	struct dlm_message *ms;
3742 	struct dlm_mhandle *mh;
3743 	int to_nodeid, error;
3744 
3745 	to_nodeid = dlm_dir_nodeid(r);
3746 
3747 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3748 	if (error)
3749 		goto out;
3750 
3751 	memcpy(ms->m_extra, r->res_name, r->res_length);
3752 	ms->m_hash = cpu_to_le32(r->res_hash);
3753 
3754 	error = send_message(mh, ms, r->res_name, r->res_length);
3755  out:
3756 	return error;
3757 }
3758 
3759 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3760 			     int mstype, int rv)
3761 {
3762 	struct dlm_message *ms;
3763 	struct dlm_mhandle *mh;
3764 	int to_nodeid, error;
3765 
3766 	to_nodeid = lkb->lkb_nodeid;
3767 
3768 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3769 	if (error)
3770 		goto out;
3771 
3772 	send_args(r, lkb, ms);
3773 
3774 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3775 
3776 	error = send_message(mh, ms, r->res_name, r->res_length);
3777  out:
3778 	return error;
3779 }
3780 
3781 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3782 {
3783 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3784 }
3785 
3786 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3787 {
3788 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3789 }
3790 
3791 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3792 {
3793 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3794 }
3795 
3796 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3797 {
3798 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3799 }
3800 
3801 static int send_lookup_reply(struct dlm_ls *ls,
3802 			     const struct dlm_message *ms_in, int ret_nodeid,
3803 			     int rv)
3804 {
3805 	struct dlm_rsb *r = &ls->ls_local_rsb;
3806 	struct dlm_message *ms;
3807 	struct dlm_mhandle *mh;
3808 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3809 
3810 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3811 	if (error)
3812 		goto out;
3813 
3814 	ms->m_lkid = ms_in->m_lkid;
3815 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3816 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3817 
3818 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3819  out:
3820 	return error;
3821 }
3822 
3823 /* which args we save from a received message depends heavily on the type
3824    of message, unlike the send side where we can safely send everything about
3825    the lkb for any type of message */
3826 
3827 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3828 {
3829 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3830 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3831 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3832 }
3833 
3834 static void receive_flags_reply(struct dlm_lkb *lkb,
3835 				const struct dlm_message *ms,
3836 				bool local)
3837 {
3838 	if (local)
3839 		return;
3840 
3841 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3842 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3843 }
3844 
3845 static int receive_extralen(const struct dlm_message *ms)
3846 {
3847 	return (le16_to_cpu(ms->m_header.h_length) -
3848 		sizeof(struct dlm_message));
3849 }
3850 
3851 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3852 		       const struct dlm_message *ms)
3853 {
3854 	int len;
3855 
3856 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3857 		if (!lkb->lkb_lvbptr)
3858 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3859 		if (!lkb->lkb_lvbptr)
3860 			return -ENOMEM;
3861 		len = receive_extralen(ms);
3862 		if (len > ls->ls_lvblen)
3863 			len = ls->ls_lvblen;
3864 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3865 	}
3866 	return 0;
3867 }
3868 
3869 static void fake_bastfn(void *astparam, int mode)
3870 {
3871 	log_print("fake_bastfn should not be called");
3872 }
3873 
3874 static void fake_astfn(void *astparam)
3875 {
3876 	log_print("fake_astfn should not be called");
3877 }
3878 
3879 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3880 				const struct dlm_message *ms)
3881 {
3882 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3883 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3884 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3885 	lkb->lkb_grmode = DLM_LOCK_IV;
3886 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3887 
3888 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3889 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3890 
3891 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3892 		/* lkb was just created so there won't be an lvb yet */
3893 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3894 		if (!lkb->lkb_lvbptr)
3895 			return -ENOMEM;
3896 	}
3897 
3898 	return 0;
3899 }
3900 
3901 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3902 				const struct dlm_message *ms)
3903 {
3904 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3905 		return -EBUSY;
3906 
3907 	if (receive_lvb(ls, lkb, ms))
3908 		return -ENOMEM;
3909 
3910 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3911 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3912 
3913 	return 0;
3914 }
3915 
3916 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3917 			       const struct dlm_message *ms)
3918 {
3919 	if (receive_lvb(ls, lkb, ms))
3920 		return -ENOMEM;
3921 	return 0;
3922 }
3923 
3924 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3925    uses to send a reply and that the remote end uses to process the reply. */
3926 
3927 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3928 {
3929 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3930 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3931 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3932 }
3933 
3934 /* This is called after the rsb is locked so that we can safely inspect
3935    fields in the lkb. */
3936 
3937 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3938 {
3939 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3940 	int error = 0;
3941 
3942 	/* currently mixing of user/kernel locks are not supported */
3943 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3944 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3945 		log_error(lkb->lkb_resource->res_ls,
3946 			  "got user dlm message for a kernel lock");
3947 		error = -EINVAL;
3948 		goto out;
3949 	}
3950 
3951 	switch (ms->m_type) {
3952 	case cpu_to_le32(DLM_MSG_CONVERT):
3953 	case cpu_to_le32(DLM_MSG_UNLOCK):
3954 	case cpu_to_le32(DLM_MSG_CANCEL):
3955 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3956 			error = -EINVAL;
3957 		break;
3958 
3959 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3960 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3961 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3962 	case cpu_to_le32(DLM_MSG_GRANT):
3963 	case cpu_to_le32(DLM_MSG_BAST):
3964 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3965 			error = -EINVAL;
3966 		break;
3967 
3968 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3969 		if (!is_process_copy(lkb))
3970 			error = -EINVAL;
3971 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3972 			error = -EINVAL;
3973 		break;
3974 
3975 	default:
3976 		error = -EINVAL;
3977 	}
3978 
3979 out:
3980 	if (error)
3981 		log_error(lkb->lkb_resource->res_ls,
3982 			  "ignore invalid message %d from %d %x %x %x %d",
3983 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3984 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3985 			  lkb->lkb_nodeid);
3986 	return error;
3987 }
3988 
3989 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3990 {
3991 	struct dlm_lkb *lkb;
3992 	struct dlm_rsb *r;
3993 	int from_nodeid;
3994 	int error, namelen = 0;
3995 
3996 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3997 
3998 	error = create_lkb(ls, &lkb);
3999 	if (error)
4000 		goto fail;
4001 
4002 	receive_flags(lkb, ms);
4003 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4004 	error = receive_request_args(ls, lkb, ms);
4005 	if (error) {
4006 		__put_lkb(ls, lkb);
4007 		goto fail;
4008 	}
4009 
4010 	/* The dir node is the authority on whether we are the master
4011 	   for this rsb or not, so if the master sends us a request, we should
4012 	   recreate the rsb if we've destroyed it.   This race happens when we
4013 	   send a remove message to the dir node at the same time that the dir
4014 	   node sends us a request for the rsb. */
4015 
4016 	namelen = receive_extralen(ms);
4017 
4018 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4019 			 R_RECEIVE_REQUEST, &r);
4020 	if (error) {
4021 		__put_lkb(ls, lkb);
4022 		goto fail;
4023 	}
4024 
4025 	lock_rsb(r);
4026 
4027 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4028 		error = validate_master_nodeid(ls, r, from_nodeid);
4029 		if (error) {
4030 			unlock_rsb(r);
4031 			put_rsb(r);
4032 			__put_lkb(ls, lkb);
4033 			goto fail;
4034 		}
4035 	}
4036 
4037 	attach_lkb(r, lkb);
4038 	error = do_request(r, lkb);
4039 	send_request_reply(r, lkb, error);
4040 	do_request_effects(r, lkb, error);
4041 
4042 	unlock_rsb(r);
4043 	put_rsb(r);
4044 
4045 	if (error == -EINPROGRESS)
4046 		error = 0;
4047 	if (error)
4048 		dlm_put_lkb(lkb);
4049 	return 0;
4050 
4051  fail:
4052 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4053 	   and do this receive_request again from process_lookup_list once
4054 	   we get the lookup reply.  This would avoid a many repeated
4055 	   ENOTBLK request failures when the lookup reply designating us
4056 	   as master is delayed. */
4057 
4058 	if (error != -ENOTBLK) {
4059 		log_limit(ls, "receive_request %x from %d %d",
4060 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4061 	}
4062 
4063 	setup_local_lkb(ls, ms);
4064 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4065 	return error;
4066 }
4067 
4068 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4069 {
4070 	struct dlm_lkb *lkb;
4071 	struct dlm_rsb *r;
4072 	int error, reply = 1;
4073 
4074 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4075 	if (error)
4076 		goto fail;
4077 
4078 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4079 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4080 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4081 			  (unsigned long long)lkb->lkb_recover_seq,
4082 			  le32_to_cpu(ms->m_header.h_nodeid),
4083 			  le32_to_cpu(ms->m_lkid));
4084 		error = -ENOENT;
4085 		dlm_put_lkb(lkb);
4086 		goto fail;
4087 	}
4088 
4089 	r = lkb->lkb_resource;
4090 
4091 	hold_rsb(r);
4092 	lock_rsb(r);
4093 
4094 	error = validate_message(lkb, ms);
4095 	if (error)
4096 		goto out;
4097 
4098 	receive_flags(lkb, ms);
4099 
4100 	error = receive_convert_args(ls, lkb, ms);
4101 	if (error) {
4102 		send_convert_reply(r, lkb, error);
4103 		goto out;
4104 	}
4105 
4106 	reply = !down_conversion(lkb);
4107 
4108 	error = do_convert(r, lkb);
4109 	if (reply)
4110 		send_convert_reply(r, lkb, error);
4111 	do_convert_effects(r, lkb, error);
4112  out:
4113 	unlock_rsb(r);
4114 	put_rsb(r);
4115 	dlm_put_lkb(lkb);
4116 	return 0;
4117 
4118  fail:
4119 	setup_local_lkb(ls, ms);
4120 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4121 	return error;
4122 }
4123 
4124 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4125 {
4126 	struct dlm_lkb *lkb;
4127 	struct dlm_rsb *r;
4128 	int error;
4129 
4130 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4131 	if (error)
4132 		goto fail;
4133 
4134 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4135 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4136 			  lkb->lkb_id, lkb->lkb_remid,
4137 			  le32_to_cpu(ms->m_header.h_nodeid),
4138 			  le32_to_cpu(ms->m_lkid));
4139 		error = -ENOENT;
4140 		dlm_put_lkb(lkb);
4141 		goto fail;
4142 	}
4143 
4144 	r = lkb->lkb_resource;
4145 
4146 	hold_rsb(r);
4147 	lock_rsb(r);
4148 
4149 	error = validate_message(lkb, ms);
4150 	if (error)
4151 		goto out;
4152 
4153 	receive_flags(lkb, ms);
4154 
4155 	error = receive_unlock_args(ls, lkb, ms);
4156 	if (error) {
4157 		send_unlock_reply(r, lkb, error);
4158 		goto out;
4159 	}
4160 
4161 	error = do_unlock(r, lkb);
4162 	send_unlock_reply(r, lkb, error);
4163 	do_unlock_effects(r, lkb, error);
4164  out:
4165 	unlock_rsb(r);
4166 	put_rsb(r);
4167 	dlm_put_lkb(lkb);
4168 	return 0;
4169 
4170  fail:
4171 	setup_local_lkb(ls, ms);
4172 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4173 	return error;
4174 }
4175 
4176 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4177 {
4178 	struct dlm_lkb *lkb;
4179 	struct dlm_rsb *r;
4180 	int error;
4181 
4182 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4183 	if (error)
4184 		goto fail;
4185 
4186 	receive_flags(lkb, ms);
4187 
4188 	r = lkb->lkb_resource;
4189 
4190 	hold_rsb(r);
4191 	lock_rsb(r);
4192 
4193 	error = validate_message(lkb, ms);
4194 	if (error)
4195 		goto out;
4196 
4197 	error = do_cancel(r, lkb);
4198 	send_cancel_reply(r, lkb, error);
4199 	do_cancel_effects(r, lkb, error);
4200  out:
4201 	unlock_rsb(r);
4202 	put_rsb(r);
4203 	dlm_put_lkb(lkb);
4204 	return 0;
4205 
4206  fail:
4207 	setup_local_lkb(ls, ms);
4208 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4209 	return error;
4210 }
4211 
4212 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4213 {
4214 	struct dlm_lkb *lkb;
4215 	struct dlm_rsb *r;
4216 	int error;
4217 
4218 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4219 	if (error)
4220 		return error;
4221 
4222 	r = lkb->lkb_resource;
4223 
4224 	hold_rsb(r);
4225 	lock_rsb(r);
4226 
4227 	error = validate_message(lkb, ms);
4228 	if (error)
4229 		goto out;
4230 
4231 	receive_flags_reply(lkb, ms, false);
4232 	if (is_altmode(lkb))
4233 		munge_altmode(lkb, ms);
4234 	grant_lock_pc(r, lkb, ms);
4235 	queue_cast(r, lkb, 0);
4236  out:
4237 	unlock_rsb(r);
4238 	put_rsb(r);
4239 	dlm_put_lkb(lkb);
4240 	return 0;
4241 }
4242 
4243 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4244 {
4245 	struct dlm_lkb *lkb;
4246 	struct dlm_rsb *r;
4247 	int error;
4248 
4249 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4250 	if (error)
4251 		return error;
4252 
4253 	r = lkb->lkb_resource;
4254 
4255 	hold_rsb(r);
4256 	lock_rsb(r);
4257 
4258 	error = validate_message(lkb, ms);
4259 	if (error)
4260 		goto out;
4261 
4262 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4263 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4264  out:
4265 	unlock_rsb(r);
4266 	put_rsb(r);
4267 	dlm_put_lkb(lkb);
4268 	return 0;
4269 }
4270 
4271 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4272 {
4273 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4274 
4275 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4276 	our_nodeid = dlm_our_nodeid();
4277 
4278 	len = receive_extralen(ms);
4279 
4280 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4281 				  &ret_nodeid, NULL);
4282 
4283 	/* Optimization: we're master so treat lookup as a request */
4284 	if (!error && ret_nodeid == our_nodeid) {
4285 		receive_request(ls, ms);
4286 		return;
4287 	}
4288 	send_lookup_reply(ls, ms, ret_nodeid, error);
4289 }
4290 
4291 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4292 {
4293 	char name[DLM_RESNAME_MAXLEN+1];
4294 	struct dlm_rsb *r;
4295 	int rv, len, dir_nodeid, from_nodeid;
4296 
4297 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4298 
4299 	len = receive_extralen(ms);
4300 
4301 	if (len > DLM_RESNAME_MAXLEN) {
4302 		log_error(ls, "receive_remove from %d bad len %d",
4303 			  from_nodeid, len);
4304 		return;
4305 	}
4306 
4307 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4308 	if (dir_nodeid != dlm_our_nodeid()) {
4309 		log_error(ls, "receive_remove from %d bad nodeid %d",
4310 			  from_nodeid, dir_nodeid);
4311 		return;
4312 	}
4313 
4314 	/*
4315 	 * Look for inactive rsb, if it's there, free it.
4316 	 * If the rsb is active, it's being used, and we should ignore this
4317 	 * message.  This is an expected race between the dir node sending a
4318 	 * request to the master node at the same time as the master node sends
4319 	 * a remove to the dir node.  The resolution to that race is for the
4320 	 * dir node to ignore the remove message, and the master node to
4321 	 * recreate the master rsb when it gets a request from the dir node for
4322 	 * an rsb it doesn't have.
4323 	 */
4324 
4325 	memset(name, 0, sizeof(name));
4326 	memcpy(name, ms->m_extra, len);
4327 
4328 	rcu_read_lock();
4329 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4330 	if (rv) {
4331 		rcu_read_unlock();
4332 		/* should not happen */
4333 		log_error(ls, "%s from %d not found %s", __func__,
4334 			  from_nodeid, name);
4335 		return;
4336 	}
4337 
4338 	write_lock_bh(&ls->ls_rsbtbl_lock);
4339 	if (!rsb_flag(r, RSB_HASHED)) {
4340 		rcu_read_unlock();
4341 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4342 		/* should not happen */
4343 		log_error(ls, "%s from %d got removed during removal %s",
4344 			  __func__, from_nodeid, name);
4345 		return;
4346 	}
4347 	/* at this stage the rsb can only being freed here */
4348 	rcu_read_unlock();
4349 
4350 	if (!rsb_flag(r, RSB_INACTIVE)) {
4351 		if (r->res_master_nodeid != from_nodeid) {
4352 			/* should not happen */
4353 			log_error(ls, "receive_remove on active rsb from %d master %d",
4354 				  from_nodeid, r->res_master_nodeid);
4355 			dlm_print_rsb(r);
4356 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 			return;
4358 		}
4359 
4360 		/* Ignore the remove message, see race comment above. */
4361 
4362 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4363 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4364 			  name);
4365 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4366 		return;
4367 	}
4368 
4369 	if (r->res_master_nodeid != from_nodeid) {
4370 		log_error(ls, "receive_remove inactive from %d master %d",
4371 			  from_nodeid, r->res_master_nodeid);
4372 		dlm_print_rsb(r);
4373 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4374 		return;
4375 	}
4376 
4377 	list_del(&r->res_slow_list);
4378 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4379 			       dlm_rhash_rsb_params);
4380 	rsb_clear_flag(r, RSB_HASHED);
4381 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4382 
4383 	free_inactive_rsb(r);
4384 }
4385 
4386 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4387 {
4388 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4389 }
4390 
4391 static int receive_request_reply(struct dlm_ls *ls,
4392 				 const struct dlm_message *ms)
4393 {
4394 	struct dlm_lkb *lkb;
4395 	struct dlm_rsb *r;
4396 	int error, mstype, result;
4397 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4398 
4399 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 	if (error)
4401 		return error;
4402 
4403 	r = lkb->lkb_resource;
4404 	hold_rsb(r);
4405 	lock_rsb(r);
4406 
4407 	error = validate_message(lkb, ms);
4408 	if (error)
4409 		goto out;
4410 
4411 	mstype = lkb->lkb_wait_type;
4412 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4413 	if (error) {
4414 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4415 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4416 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4417 		dlm_dump_rsb(r);
4418 		goto out;
4419 	}
4420 
4421 	/* Optimization: the dir node was also the master, so it took our
4422 	   lookup as a request and sent request reply instead of lookup reply */
4423 	if (mstype == DLM_MSG_LOOKUP) {
4424 		r->res_master_nodeid = from_nodeid;
4425 		r->res_nodeid = from_nodeid;
4426 		lkb->lkb_nodeid = from_nodeid;
4427 	}
4428 
4429 	/* this is the value returned from do_request() on the master */
4430 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4431 
4432 	switch (result) {
4433 	case -EAGAIN:
4434 		/* request would block (be queued) on remote master */
4435 		queue_cast(r, lkb, -EAGAIN);
4436 		confirm_master(r, -EAGAIN);
4437 		unhold_lkb(lkb); /* undoes create_lkb() */
4438 		break;
4439 
4440 	case -EINPROGRESS:
4441 	case 0:
4442 		/* request was queued or granted on remote master */
4443 		receive_flags_reply(lkb, ms, false);
4444 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4445 		if (is_altmode(lkb))
4446 			munge_altmode(lkb, ms);
4447 		if (result) {
4448 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4449 		} else {
4450 			grant_lock_pc(r, lkb, ms);
4451 			queue_cast(r, lkb, 0);
4452 		}
4453 		confirm_master(r, result);
4454 		break;
4455 
4456 	case -EBADR:
4457 	case -ENOTBLK:
4458 		/* find_rsb failed to find rsb or rsb wasn't master */
4459 		log_limit(ls, "receive_request_reply %x from %d %d "
4460 			  "master %d dir %d first %x %s", lkb->lkb_id,
4461 			  from_nodeid, result, r->res_master_nodeid,
4462 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4463 
4464 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4465 		    r->res_master_nodeid != dlm_our_nodeid()) {
4466 			/* cause _request_lock->set_master->send_lookup */
4467 			r->res_master_nodeid = 0;
4468 			r->res_nodeid = -1;
4469 			lkb->lkb_nodeid = -1;
4470 		}
4471 
4472 		if (is_overlap(lkb)) {
4473 			/* we'll ignore error in cancel/unlock reply */
4474 			queue_cast_overlap(r, lkb);
4475 			confirm_master(r, result);
4476 			unhold_lkb(lkb); /* undoes create_lkb() */
4477 		} else {
4478 			_request_lock(r, lkb);
4479 
4480 			if (r->res_master_nodeid == dlm_our_nodeid())
4481 				confirm_master(r, 0);
4482 		}
4483 		break;
4484 
4485 	default:
4486 		log_error(ls, "receive_request_reply %x error %d",
4487 			  lkb->lkb_id, result);
4488 	}
4489 
4490 	if ((result == 0 || result == -EINPROGRESS) &&
4491 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4492 		log_debug(ls, "receive_request_reply %x result %d unlock",
4493 			  lkb->lkb_id, result);
4494 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4495 		send_unlock(r, lkb);
4496 	} else if ((result == -EINPROGRESS) &&
4497 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4498 				      &lkb->lkb_iflags)) {
4499 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4500 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4501 		send_cancel(r, lkb);
4502 	} else {
4503 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4504 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4505 	}
4506  out:
4507 	unlock_rsb(r);
4508 	put_rsb(r);
4509 	dlm_put_lkb(lkb);
4510 	return 0;
4511 }
4512 
4513 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4514 				    const struct dlm_message *ms, bool local)
4515 {
4516 	/* this is the value returned from do_convert() on the master */
4517 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4518 	case -EAGAIN:
4519 		/* convert would block (be queued) on remote master */
4520 		queue_cast(r, lkb, -EAGAIN);
4521 		break;
4522 
4523 	case -EDEADLK:
4524 		receive_flags_reply(lkb, ms, local);
4525 		revert_lock_pc(r, lkb);
4526 		queue_cast(r, lkb, -EDEADLK);
4527 		break;
4528 
4529 	case -EINPROGRESS:
4530 		/* convert was queued on remote master */
4531 		receive_flags_reply(lkb, ms, local);
4532 		if (is_demoted(lkb))
4533 			munge_demoted(lkb);
4534 		del_lkb(r, lkb);
4535 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4536 		break;
4537 
4538 	case 0:
4539 		/* convert was granted on remote master */
4540 		receive_flags_reply(lkb, ms, local);
4541 		if (is_demoted(lkb))
4542 			munge_demoted(lkb);
4543 		grant_lock_pc(r, lkb, ms);
4544 		queue_cast(r, lkb, 0);
4545 		break;
4546 
4547 	default:
4548 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4549 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4550 			  le32_to_cpu(ms->m_lkid),
4551 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4552 		dlm_print_rsb(r);
4553 		dlm_print_lkb(lkb);
4554 	}
4555 }
4556 
4557 static void _receive_convert_reply(struct dlm_lkb *lkb,
4558 				   const struct dlm_message *ms, bool local)
4559 {
4560 	struct dlm_rsb *r = lkb->lkb_resource;
4561 	int error;
4562 
4563 	hold_rsb(r);
4564 	lock_rsb(r);
4565 
4566 	error = validate_message(lkb, ms);
4567 	if (error)
4568 		goto out;
4569 
4570 	error = remove_from_waiters_ms(lkb, ms, local);
4571 	if (error)
4572 		goto out;
4573 
4574 	__receive_convert_reply(r, lkb, ms, local);
4575  out:
4576 	unlock_rsb(r);
4577 	put_rsb(r);
4578 }
4579 
4580 static int receive_convert_reply(struct dlm_ls *ls,
4581 				 const struct dlm_message *ms)
4582 {
4583 	struct dlm_lkb *lkb;
4584 	int error;
4585 
4586 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4587 	if (error)
4588 		return error;
4589 
4590 	_receive_convert_reply(lkb, ms, false);
4591 	dlm_put_lkb(lkb);
4592 	return 0;
4593 }
4594 
4595 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4596 				  const struct dlm_message *ms, bool local)
4597 {
4598 	struct dlm_rsb *r = lkb->lkb_resource;
4599 	int error;
4600 
4601 	hold_rsb(r);
4602 	lock_rsb(r);
4603 
4604 	error = validate_message(lkb, ms);
4605 	if (error)
4606 		goto out;
4607 
4608 	error = remove_from_waiters_ms(lkb, ms, local);
4609 	if (error)
4610 		goto out;
4611 
4612 	/* this is the value returned from do_unlock() on the master */
4613 
4614 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4615 	case -DLM_EUNLOCK:
4616 		receive_flags_reply(lkb, ms, local);
4617 		remove_lock_pc(r, lkb);
4618 		queue_cast(r, lkb, -DLM_EUNLOCK);
4619 		break;
4620 	case -ENOENT:
4621 		break;
4622 	default:
4623 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4624 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4625 	}
4626  out:
4627 	unlock_rsb(r);
4628 	put_rsb(r);
4629 }
4630 
4631 static int receive_unlock_reply(struct dlm_ls *ls,
4632 				const struct dlm_message *ms)
4633 {
4634 	struct dlm_lkb *lkb;
4635 	int error;
4636 
4637 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4638 	if (error)
4639 		return error;
4640 
4641 	_receive_unlock_reply(lkb, ms, false);
4642 	dlm_put_lkb(lkb);
4643 	return 0;
4644 }
4645 
4646 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4647 				  const struct dlm_message *ms, bool local)
4648 {
4649 	struct dlm_rsb *r = lkb->lkb_resource;
4650 	int error;
4651 
4652 	hold_rsb(r);
4653 	lock_rsb(r);
4654 
4655 	error = validate_message(lkb, ms);
4656 	if (error)
4657 		goto out;
4658 
4659 	error = remove_from_waiters_ms(lkb, ms, local);
4660 	if (error)
4661 		goto out;
4662 
4663 	/* this is the value returned from do_cancel() on the master */
4664 
4665 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4666 	case -DLM_ECANCEL:
4667 		receive_flags_reply(lkb, ms, local);
4668 		revert_lock_pc(r, lkb);
4669 		queue_cast(r, lkb, -DLM_ECANCEL);
4670 		break;
4671 	case 0:
4672 		break;
4673 	default:
4674 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4675 			  lkb->lkb_id,
4676 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4677 	}
4678  out:
4679 	unlock_rsb(r);
4680 	put_rsb(r);
4681 }
4682 
4683 static int receive_cancel_reply(struct dlm_ls *ls,
4684 				const struct dlm_message *ms)
4685 {
4686 	struct dlm_lkb *lkb;
4687 	int error;
4688 
4689 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4690 	if (error)
4691 		return error;
4692 
4693 	_receive_cancel_reply(lkb, ms, false);
4694 	dlm_put_lkb(lkb);
4695 	return 0;
4696 }
4697 
4698 static void receive_lookup_reply(struct dlm_ls *ls,
4699 				 const struct dlm_message *ms)
4700 {
4701 	struct dlm_lkb *lkb;
4702 	struct dlm_rsb *r;
4703 	int error, ret_nodeid;
4704 	int do_lookup_list = 0;
4705 
4706 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4707 	if (error) {
4708 		log_error(ls, "%s no lkid %x", __func__,
4709 			  le32_to_cpu(ms->m_lkid));
4710 		return;
4711 	}
4712 
4713 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4714 	   FIXME: will a non-zero error ever be returned? */
4715 
4716 	r = lkb->lkb_resource;
4717 	hold_rsb(r);
4718 	lock_rsb(r);
4719 
4720 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4721 	if (error)
4722 		goto out;
4723 
4724 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4725 
4726 	/* We sometimes receive a request from the dir node for this
4727 	   rsb before we've received the dir node's loookup_reply for it.
4728 	   The request from the dir node implies we're the master, so we set
4729 	   ourself as master in receive_request_reply, and verify here that
4730 	   we are indeed the master. */
4731 
4732 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4733 		/* This should never happen */
4734 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4735 			  "master %d dir %d our %d first %x %s",
4736 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4737 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4738 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4739 	}
4740 
4741 	if (ret_nodeid == dlm_our_nodeid()) {
4742 		r->res_master_nodeid = ret_nodeid;
4743 		r->res_nodeid = 0;
4744 		do_lookup_list = 1;
4745 		r->res_first_lkid = 0;
4746 	} else if (ret_nodeid == -1) {
4747 		/* the remote node doesn't believe it's the dir node */
4748 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4749 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4750 		r->res_master_nodeid = 0;
4751 		r->res_nodeid = -1;
4752 		lkb->lkb_nodeid = -1;
4753 	} else {
4754 		/* set_master() will set lkb_nodeid from r */
4755 		r->res_master_nodeid = ret_nodeid;
4756 		r->res_nodeid = ret_nodeid;
4757 	}
4758 
4759 	if (is_overlap(lkb)) {
4760 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4761 			  lkb->lkb_id, dlm_iflags_val(lkb));
4762 		queue_cast_overlap(r, lkb);
4763 		unhold_lkb(lkb); /* undoes create_lkb() */
4764 		goto out_list;
4765 	}
4766 
4767 	_request_lock(r, lkb);
4768 
4769  out_list:
4770 	if (do_lookup_list)
4771 		process_lookup_list(r);
4772  out:
4773 	unlock_rsb(r);
4774 	put_rsb(r);
4775 	dlm_put_lkb(lkb);
4776 }
4777 
4778 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4779 			     uint32_t saved_seq)
4780 {
4781 	int error = 0, noent = 0;
4782 
4783 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4784 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4785 			  le32_to_cpu(ms->m_type),
4786 			  le32_to_cpu(ms->m_header.h_nodeid),
4787 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4788 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4789 		return;
4790 	}
4791 
4792 	switch (ms->m_type) {
4793 
4794 	/* messages sent to a master node */
4795 
4796 	case cpu_to_le32(DLM_MSG_REQUEST):
4797 		error = receive_request(ls, ms);
4798 		break;
4799 
4800 	case cpu_to_le32(DLM_MSG_CONVERT):
4801 		error = receive_convert(ls, ms);
4802 		break;
4803 
4804 	case cpu_to_le32(DLM_MSG_UNLOCK):
4805 		error = receive_unlock(ls, ms);
4806 		break;
4807 
4808 	case cpu_to_le32(DLM_MSG_CANCEL):
4809 		noent = 1;
4810 		error = receive_cancel(ls, ms);
4811 		break;
4812 
4813 	/* messages sent from a master node (replies to above) */
4814 
4815 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4816 		error = receive_request_reply(ls, ms);
4817 		break;
4818 
4819 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4820 		error = receive_convert_reply(ls, ms);
4821 		break;
4822 
4823 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4824 		error = receive_unlock_reply(ls, ms);
4825 		break;
4826 
4827 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4828 		error = receive_cancel_reply(ls, ms);
4829 		break;
4830 
4831 	/* messages sent from a master node (only two types of async msg) */
4832 
4833 	case cpu_to_le32(DLM_MSG_GRANT):
4834 		noent = 1;
4835 		error = receive_grant(ls, ms);
4836 		break;
4837 
4838 	case cpu_to_le32(DLM_MSG_BAST):
4839 		noent = 1;
4840 		error = receive_bast(ls, ms);
4841 		break;
4842 
4843 	/* messages sent to a dir node */
4844 
4845 	case cpu_to_le32(DLM_MSG_LOOKUP):
4846 		receive_lookup(ls, ms);
4847 		break;
4848 
4849 	case cpu_to_le32(DLM_MSG_REMOVE):
4850 		receive_remove(ls, ms);
4851 		break;
4852 
4853 	/* messages sent from a dir node (remove has no reply) */
4854 
4855 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4856 		receive_lookup_reply(ls, ms);
4857 		break;
4858 
4859 	/* other messages */
4860 
4861 	case cpu_to_le32(DLM_MSG_PURGE):
4862 		receive_purge(ls, ms);
4863 		break;
4864 
4865 	default:
4866 		log_error(ls, "unknown message type %d",
4867 			  le32_to_cpu(ms->m_type));
4868 	}
4869 
4870 	/*
4871 	 * When checking for ENOENT, we're checking the result of
4872 	 * find_lkb(m_remid):
4873 	 *
4874 	 * The lock id referenced in the message wasn't found.  This may
4875 	 * happen in normal usage for the async messages and cancel, so
4876 	 * only use log_debug for them.
4877 	 *
4878 	 * Some errors are expected and normal.
4879 	 */
4880 
4881 	if (error == -ENOENT && noent) {
4882 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4883 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4884 			  le32_to_cpu(ms->m_header.h_nodeid),
4885 			  le32_to_cpu(ms->m_lkid), saved_seq);
4886 	} else if (error == -ENOENT) {
4887 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4888 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4889 			  le32_to_cpu(ms->m_header.h_nodeid),
4890 			  le32_to_cpu(ms->m_lkid), saved_seq);
4891 
4892 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4893 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4894 	}
4895 
4896 	if (error == -EINVAL) {
4897 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4898 			  "saved_seq %u",
4899 			  le32_to_cpu(ms->m_type),
4900 			  le32_to_cpu(ms->m_header.h_nodeid),
4901 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4902 			  saved_seq);
4903 	}
4904 }
4905 
4906 /* If the lockspace is in recovery mode (locking stopped), then normal
4907    messages are saved on the requestqueue for processing after recovery is
4908    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4909    messages off the requestqueue before we process new ones. This occurs right
4910    after recovery completes when we transition from saving all messages on
4911    requestqueue, to processing all the saved messages, to processing new
4912    messages as they arrive. */
4913 
4914 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4915 				int nodeid)
4916 {
4917 try_again:
4918 	read_lock_bh(&ls->ls_requestqueue_lock);
4919 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4920 		/* If we were a member of this lockspace, left, and rejoined,
4921 		   other nodes may still be sending us messages from the
4922 		   lockspace generation before we left. */
4923 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4924 			read_unlock_bh(&ls->ls_requestqueue_lock);
4925 			log_limit(ls, "receive %d from %d ignore old gen",
4926 				  le32_to_cpu(ms->m_type), nodeid);
4927 			return;
4928 		}
4929 
4930 		read_unlock_bh(&ls->ls_requestqueue_lock);
4931 		write_lock_bh(&ls->ls_requestqueue_lock);
4932 		/* recheck because we hold writelock now */
4933 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4934 			write_unlock_bh(&ls->ls_requestqueue_lock);
4935 			goto try_again;
4936 		}
4937 
4938 		dlm_add_requestqueue(ls, nodeid, ms);
4939 		write_unlock_bh(&ls->ls_requestqueue_lock);
4940 	} else {
4941 		_receive_message(ls, ms, 0);
4942 		read_unlock_bh(&ls->ls_requestqueue_lock);
4943 	}
4944 }
4945 
4946 /* This is called by dlm_recoverd to process messages that were saved on
4947    the requestqueue. */
4948 
4949 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4950 			       uint32_t saved_seq)
4951 {
4952 	_receive_message(ls, ms, saved_seq);
4953 }
4954 
4955 /* This is called by the midcomms layer when something is received for
4956    the lockspace.  It could be either a MSG (normal message sent as part of
4957    standard locking activity) or an RCOM (recovery message sent as part of
4958    lockspace recovery). */
4959 
4960 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4961 {
4962 	const struct dlm_header *hd = &p->header;
4963 	struct dlm_ls *ls;
4964 	int type = 0;
4965 
4966 	switch (hd->h_cmd) {
4967 	case DLM_MSG:
4968 		type = le32_to_cpu(p->message.m_type);
4969 		break;
4970 	case DLM_RCOM:
4971 		type = le32_to_cpu(p->rcom.rc_type);
4972 		break;
4973 	default:
4974 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4975 		return;
4976 	}
4977 
4978 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4979 		log_print("invalid h_nodeid %d from %d lockspace %x",
4980 			  le32_to_cpu(hd->h_nodeid), nodeid,
4981 			  le32_to_cpu(hd->u.h_lockspace));
4982 		return;
4983 	}
4984 
4985 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4986 	if (!ls) {
4987 		if (dlm_config.ci_log_debug) {
4988 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4989 				"%u from %d cmd %d type %d\n",
4990 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4991 				hd->h_cmd, type);
4992 		}
4993 
4994 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4995 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4996 		return;
4997 	}
4998 
4999 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5000 	   be inactive (in this ls) before transitioning to recovery mode */
5001 
5002 	read_lock_bh(&ls->ls_recv_active);
5003 	if (hd->h_cmd == DLM_MSG)
5004 		dlm_receive_message(ls, &p->message, nodeid);
5005 	else if (hd->h_cmd == DLM_RCOM)
5006 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5007 	else
5008 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5009 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5010 	read_unlock_bh(&ls->ls_recv_active);
5011 
5012 	dlm_put_lockspace(ls);
5013 }
5014 
5015 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5016 				   struct dlm_message *ms_local)
5017 {
5018 	if (middle_conversion(lkb)) {
5019 		hold_lkb(lkb);
5020 		memset(ms_local, 0, sizeof(struct dlm_message));
5021 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5022 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5023 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5024 		_receive_convert_reply(lkb, ms_local, true);
5025 
5026 		/* Same special case as in receive_rcom_lock_args() */
5027 		lkb->lkb_grmode = DLM_LOCK_IV;
5028 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5029 		unhold_lkb(lkb);
5030 
5031 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5032 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5033 	}
5034 
5035 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5036 	   conversions are async; there's no reply from the remote master */
5037 }
5038 
5039 /* A waiting lkb needs recovery if the master node has failed, or
5040    the master node is changing (only when no directory is used) */
5041 
5042 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5043 				 int dir_nodeid)
5044 {
5045 	if (dlm_no_directory(ls))
5046 		return 1;
5047 
5048 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5049 		return 1;
5050 
5051 	return 0;
5052 }
5053 
5054 /* Recovery for locks that are waiting for replies from nodes that are now
5055    gone.  We can just complete unlocks and cancels by faking a reply from the
5056    dead node.  Requests and up-conversions we flag to be resent after
5057    recovery.  Down-conversions can just be completed with a fake reply like
5058    unlocks.  Conversions between PR and CW need special attention. */
5059 
5060 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5061 {
5062 	struct dlm_lkb *lkb, *safe;
5063 	struct dlm_message *ms_local;
5064 	int wait_type, local_unlock_result, local_cancel_result;
5065 	int dir_nodeid;
5066 
5067 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5068 	if (!ms_local)
5069 		return;
5070 
5071 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5072 
5073 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5074 
5075 		/* exclude debug messages about unlocks because there can be so
5076 		   many and they aren't very interesting */
5077 
5078 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5079 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5080 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5081 				  lkb->lkb_id,
5082 				  lkb->lkb_remid,
5083 				  lkb->lkb_wait_type,
5084 				  lkb->lkb_resource->res_nodeid,
5085 				  lkb->lkb_nodeid,
5086 				  lkb->lkb_wait_nodeid,
5087 				  dir_nodeid);
5088 		}
5089 
5090 		/* all outstanding lookups, regardless of destination  will be
5091 		   resent after recovery is done */
5092 
5093 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5094 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5095 			continue;
5096 		}
5097 
5098 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5099 			continue;
5100 
5101 		wait_type = lkb->lkb_wait_type;
5102 		local_unlock_result = -DLM_EUNLOCK;
5103 		local_cancel_result = -DLM_ECANCEL;
5104 
5105 		/* Main reply may have been received leaving a zero wait_type,
5106 		   but a reply for the overlapping op may not have been
5107 		   received.  In that case we need to fake the appropriate
5108 		   reply for the overlap op. */
5109 
5110 		if (!wait_type) {
5111 			if (is_overlap_cancel(lkb)) {
5112 				wait_type = DLM_MSG_CANCEL;
5113 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5114 					local_cancel_result = 0;
5115 			}
5116 			if (is_overlap_unlock(lkb)) {
5117 				wait_type = DLM_MSG_UNLOCK;
5118 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5119 					local_unlock_result = -ENOENT;
5120 			}
5121 
5122 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5123 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5124 				  local_cancel_result, local_unlock_result);
5125 		}
5126 
5127 		switch (wait_type) {
5128 
5129 		case DLM_MSG_REQUEST:
5130 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5131 			break;
5132 
5133 		case DLM_MSG_CONVERT:
5134 			recover_convert_waiter(ls, lkb, ms_local);
5135 			break;
5136 
5137 		case DLM_MSG_UNLOCK:
5138 			hold_lkb(lkb);
5139 			memset(ms_local, 0, sizeof(struct dlm_message));
5140 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5141 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5142 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5143 			_receive_unlock_reply(lkb, ms_local, true);
5144 			dlm_put_lkb(lkb);
5145 			break;
5146 
5147 		case DLM_MSG_CANCEL:
5148 			hold_lkb(lkb);
5149 			memset(ms_local, 0, sizeof(struct dlm_message));
5150 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5151 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5152 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5153 			_receive_cancel_reply(lkb, ms_local, true);
5154 			dlm_put_lkb(lkb);
5155 			break;
5156 
5157 		default:
5158 			log_error(ls, "invalid lkb wait_type %d %d",
5159 				  lkb->lkb_wait_type, wait_type);
5160 		}
5161 		schedule();
5162 	}
5163 	kfree(ms_local);
5164 }
5165 
5166 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5167 {
5168 	struct dlm_lkb *lkb = NULL, *iter;
5169 
5170 	spin_lock_bh(&ls->ls_waiters_lock);
5171 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5172 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5173 			hold_lkb(iter);
5174 			lkb = iter;
5175 			break;
5176 		}
5177 	}
5178 	spin_unlock_bh(&ls->ls_waiters_lock);
5179 
5180 	return lkb;
5181 }
5182 
5183 /*
5184  * Forced state reset for locks that were in the middle of remote operations
5185  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5186  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5187  * list need to be reevaluated; some may need resending to a different node
5188  * than previously, and some may now need local handling rather than remote.
5189  *
5190  * First, the lkb state for the voided remote operation is forcibly reset,
5191  * equivalent to what remove_from_waiters() would normally do:
5192  * . lkb removed from ls_waiters list
5193  * . lkb wait_type cleared
5194  * . lkb waiters_count cleared
5195  * . lkb ref count decremented for each waiters_count (almost always 1,
5196  *   but possibly 2 in case of cancel/unlock overlapping, which means
5197  *   two remote replies were being expected for the lkb.)
5198  *
5199  * Second, the lkb is reprocessed like an original operation would be,
5200  * by passing it to _request_lock or _convert_lock, which will either
5201  * process the lkb operation locally, or send it to a remote node again
5202  * and put the lkb back onto the waiters list.
5203  *
5204  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5205  * force-unlock or cancel, either from before recovery began, or after recovery
5206  * finished.  If this is the case, the unlock/cancel is done directly, and the
5207  * original operation is not initiated again (no _request_lock/_convert_lock.)
5208  */
5209 
5210 int dlm_recover_waiters_post(struct dlm_ls *ls)
5211 {
5212 	struct dlm_lkb *lkb;
5213 	struct dlm_rsb *r;
5214 	int error = 0, mstype, err, oc, ou;
5215 
5216 	while (1) {
5217 		if (dlm_locking_stopped(ls)) {
5218 			log_debug(ls, "recover_waiters_post aborted");
5219 			error = -EINTR;
5220 			break;
5221 		}
5222 
5223 		/*
5224 		 * Find an lkb from the waiters list that's been affected by
5225 		 * recovery node changes, and needs to be reprocessed.  Does
5226 		 * hold_lkb(), adding a refcount.
5227 		 */
5228 		lkb = find_resend_waiter(ls);
5229 		if (!lkb)
5230 			break;
5231 
5232 		r = lkb->lkb_resource;
5233 		hold_rsb(r);
5234 		lock_rsb(r);
5235 
5236 		/*
5237 		 * If the lkb has been flagged for a force unlock or cancel,
5238 		 * then the reprocessing below will be replaced by just doing
5239 		 * the unlock/cancel directly.
5240 		 */
5241 		mstype = lkb->lkb_wait_type;
5242 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5243 					&lkb->lkb_iflags);
5244 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5245 					&lkb->lkb_iflags);
5246 		err = 0;
5247 
5248 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5249 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5250 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5251 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5252 			  dlm_dir_nodeid(r), oc, ou);
5253 
5254 		/*
5255 		 * No reply to the pre-recovery operation will now be received,
5256 		 * so a forced equivalent of remove_from_waiters() is needed to
5257 		 * reset the waiters state that was in place before recovery.
5258 		 */
5259 
5260 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5261 
5262 		/* Forcibly clear wait_type */
5263 		lkb->lkb_wait_type = 0;
5264 
5265 		/*
5266 		 * Forcibly reset wait_count and associated refcount.  The
5267 		 * wait_count will almost always be 1, but in case of an
5268 		 * overlapping unlock/cancel it could be 2: see where
5269 		 * add_to_waiters() finds the lkb is already on the waiters
5270 		 * list and does lkb_wait_count++; hold_lkb().
5271 		 */
5272 		while (lkb->lkb_wait_count) {
5273 			lkb->lkb_wait_count--;
5274 			unhold_lkb(lkb);
5275 		}
5276 
5277 		/* Forcibly remove from waiters list */
5278 		spin_lock_bh(&ls->ls_waiters_lock);
5279 		list_del_init(&lkb->lkb_wait_reply);
5280 		spin_unlock_bh(&ls->ls_waiters_lock);
5281 
5282 		/*
5283 		 * The lkb is now clear of all prior waiters state and can be
5284 		 * processed locally, or sent to remote node again, or directly
5285 		 * cancelled/unlocked.
5286 		 */
5287 
5288 		if (oc || ou) {
5289 			/* do an unlock or cancel instead of resending */
5290 			switch (mstype) {
5291 			case DLM_MSG_LOOKUP:
5292 			case DLM_MSG_REQUEST:
5293 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5294 							-DLM_ECANCEL);
5295 				unhold_lkb(lkb); /* undoes create_lkb() */
5296 				break;
5297 			case DLM_MSG_CONVERT:
5298 				if (oc) {
5299 					queue_cast(r, lkb, -DLM_ECANCEL);
5300 				} else {
5301 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5302 					_unlock_lock(r, lkb);
5303 				}
5304 				break;
5305 			default:
5306 				err = 1;
5307 			}
5308 		} else {
5309 			switch (mstype) {
5310 			case DLM_MSG_LOOKUP:
5311 			case DLM_MSG_REQUEST:
5312 				_request_lock(r, lkb);
5313 				if (r->res_nodeid != -1 && is_master(r))
5314 					confirm_master(r, 0);
5315 				break;
5316 			case DLM_MSG_CONVERT:
5317 				_convert_lock(r, lkb);
5318 				break;
5319 			default:
5320 				err = 1;
5321 			}
5322 		}
5323 
5324 		if (err) {
5325 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5326 				  "dir_nodeid %d overlap %d %d",
5327 				  lkb->lkb_id, mstype, r->res_nodeid,
5328 				  dlm_dir_nodeid(r), oc, ou);
5329 		}
5330 		unlock_rsb(r);
5331 		put_rsb(r);
5332 		dlm_put_lkb(lkb);
5333 	}
5334 
5335 	return error;
5336 }
5337 
5338 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5339 			      struct list_head *list)
5340 {
5341 	struct dlm_lkb *lkb, *safe;
5342 
5343 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5344 		if (!is_master_copy(lkb))
5345 			continue;
5346 
5347 		/* don't purge lkbs we've added in recover_master_copy for
5348 		   the current recovery seq */
5349 
5350 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5351 			continue;
5352 
5353 		del_lkb(r, lkb);
5354 
5355 		/* this put should free the lkb */
5356 		if (!dlm_put_lkb(lkb))
5357 			log_error(ls, "purged mstcpy lkb not released");
5358 	}
5359 }
5360 
5361 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5362 {
5363 	struct dlm_ls *ls = r->res_ls;
5364 
5365 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5366 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5367 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5368 }
5369 
5370 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5371 			    struct list_head *list,
5372 			    int nodeid_gone, unsigned int *count)
5373 {
5374 	struct dlm_lkb *lkb, *safe;
5375 
5376 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5377 		if (!is_master_copy(lkb))
5378 			continue;
5379 
5380 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5381 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5382 
5383 			/* tell recover_lvb to invalidate the lvb
5384 			   because a node holding EX/PW failed */
5385 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5386 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5387 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5388 			}
5389 
5390 			del_lkb(r, lkb);
5391 
5392 			/* this put should free the lkb */
5393 			if (!dlm_put_lkb(lkb))
5394 				log_error(ls, "purged dead lkb not released");
5395 
5396 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5397 
5398 			(*count)++;
5399 		}
5400 	}
5401 }
5402 
5403 /* Get rid of locks held by nodes that are gone. */
5404 
5405 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5406 {
5407 	struct dlm_rsb *r;
5408 	struct dlm_member *memb;
5409 	int nodes_count = 0;
5410 	int nodeid_gone = 0;
5411 	unsigned int lkb_count = 0;
5412 
5413 	/* cache one removed nodeid to optimize the common
5414 	   case of a single node removed */
5415 
5416 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5417 		nodes_count++;
5418 		nodeid_gone = memb->nodeid;
5419 	}
5420 
5421 	if (!nodes_count)
5422 		return;
5423 
5424 	list_for_each_entry(r, root_list, res_root_list) {
5425 		lock_rsb(r);
5426 		if (r->res_nodeid != -1 && is_master(r)) {
5427 			purge_dead_list(ls, r, &r->res_grantqueue,
5428 					nodeid_gone, &lkb_count);
5429 			purge_dead_list(ls, r, &r->res_convertqueue,
5430 					nodeid_gone, &lkb_count);
5431 			purge_dead_list(ls, r, &r->res_waitqueue,
5432 					nodeid_gone, &lkb_count);
5433 		}
5434 		unlock_rsb(r);
5435 
5436 		cond_resched();
5437 	}
5438 
5439 	if (lkb_count)
5440 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5441 			  lkb_count, nodes_count);
5442 }
5443 
5444 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5445 {
5446 	struct dlm_rsb *r;
5447 
5448 	read_lock_bh(&ls->ls_rsbtbl_lock);
5449 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5450 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5451 			continue;
5452 		if (!is_master(r)) {
5453 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5454 			continue;
5455 		}
5456 		hold_rsb(r);
5457 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5458 		return r;
5459 	}
5460 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5461 	return NULL;
5462 }
5463 
5464 /*
5465  * Attempt to grant locks on resources that we are the master of.
5466  * Locks may have become grantable during recovery because locks
5467  * from departed nodes have been purged (or not rebuilt), allowing
5468  * previously blocked locks to now be granted.  The subset of rsb's
5469  * we are interested in are those with lkb's on either the convert or
5470  * waiting queues.
5471  *
5472  * Simplest would be to go through each master rsb and check for non-empty
5473  * convert or waiting queues, and attempt to grant on those rsbs.
5474  * Checking the queues requires lock_rsb, though, for which we'd need
5475  * to release the rsbtbl lock.  This would make iterating through all
5476  * rsb's very inefficient.  So, we rely on earlier recovery routines
5477  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5478  * locks for.
5479  */
5480 
5481 void dlm_recover_grant(struct dlm_ls *ls)
5482 {
5483 	struct dlm_rsb *r;
5484 	unsigned int count = 0;
5485 	unsigned int rsb_count = 0;
5486 	unsigned int lkb_count = 0;
5487 
5488 	while (1) {
5489 		r = find_grant_rsb(ls);
5490 		if (!r)
5491 			break;
5492 
5493 		rsb_count++;
5494 		count = 0;
5495 		lock_rsb(r);
5496 		/* the RECOVER_GRANT flag is checked in the grant path */
5497 		grant_pending_locks(r, &count);
5498 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5499 		lkb_count += count;
5500 		confirm_master(r, 0);
5501 		unlock_rsb(r);
5502 		put_rsb(r);
5503 		cond_resched();
5504 	}
5505 
5506 	if (lkb_count)
5507 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5508 			  lkb_count, rsb_count);
5509 }
5510 
5511 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5512 					 uint32_t remid)
5513 {
5514 	struct dlm_lkb *lkb;
5515 
5516 	list_for_each_entry(lkb, head, lkb_statequeue) {
5517 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5518 			return lkb;
5519 	}
5520 	return NULL;
5521 }
5522 
5523 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5524 				    uint32_t remid)
5525 {
5526 	struct dlm_lkb *lkb;
5527 
5528 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5529 	if (lkb)
5530 		return lkb;
5531 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5532 	if (lkb)
5533 		return lkb;
5534 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5535 	if (lkb)
5536 		return lkb;
5537 	return NULL;
5538 }
5539 
5540 /* needs at least dlm_rcom + rcom_lock */
5541 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5542 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5543 {
5544 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5545 
5546 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5547 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5548 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5549 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5550 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5551 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5552 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5553 	lkb->lkb_rqmode = rl->rl_rqmode;
5554 	lkb->lkb_grmode = rl->rl_grmode;
5555 	/* don't set lkb_status because add_lkb wants to itself */
5556 
5557 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5558 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5559 
5560 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5561 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5562 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5563 		if (lvblen > ls->ls_lvblen)
5564 			return -EINVAL;
5565 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5566 		if (!lkb->lkb_lvbptr)
5567 			return -ENOMEM;
5568 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5569 	}
5570 
5571 	/* Conversions between PR and CW (middle modes) need special handling.
5572 	   The real granted mode of these converting locks cannot be determined
5573 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5574 
5575 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5576 	    middle_conversion(lkb)) {
5577 		rl->rl_status = DLM_LKSTS_CONVERT;
5578 		lkb->lkb_grmode = DLM_LOCK_IV;
5579 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5580 	}
5581 
5582 	return 0;
5583 }
5584 
5585 /* This lkb may have been recovered in a previous aborted recovery so we need
5586    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5587    If so we just send back a standard reply.  If not, we create a new lkb with
5588    the given values and send back our lkid.  We send back our lkid by sending
5589    back the rcom_lock struct we got but with the remid field filled in. */
5590 
5591 /* needs at least dlm_rcom + rcom_lock */
5592 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5593 			    __le32 *rl_remid, __le32 *rl_result)
5594 {
5595 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5596 	struct dlm_rsb *r;
5597 	struct dlm_lkb *lkb;
5598 	uint32_t remid = 0;
5599 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5600 	int error;
5601 
5602 	/* init rl_remid with rcom lock rl_remid */
5603 	*rl_remid = rl->rl_remid;
5604 
5605 	if (rl->rl_parent_lkid) {
5606 		error = -EOPNOTSUPP;
5607 		goto out;
5608 	}
5609 
5610 	remid = le32_to_cpu(rl->rl_lkid);
5611 
5612 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5613 	   have to require it.  Recovery of masters on one node can overlap
5614 	   recovery of locks on another node, so one node can send us MSTCPY
5615 	   locks before we've made ourselves master of this rsb.  We can still
5616 	   add new MSTCPY locks that we receive here without any harm; when
5617 	   we make ourselves master, dlm_recover_masters() won't touch the
5618 	   MSTCPY locks we've received early. */
5619 
5620 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5621 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5622 	if (error)
5623 		goto out;
5624 
5625 	lock_rsb(r);
5626 
5627 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5628 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5629 			  from_nodeid, remid);
5630 		error = -EBADR;
5631 		goto out_unlock;
5632 	}
5633 
5634 	lkb = search_remid(r, from_nodeid, remid);
5635 	if (lkb) {
5636 		error = -EEXIST;
5637 		goto out_remid;
5638 	}
5639 
5640 	error = create_lkb(ls, &lkb);
5641 	if (error)
5642 		goto out_unlock;
5643 
5644 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5645 	if (error) {
5646 		__put_lkb(ls, lkb);
5647 		goto out_unlock;
5648 	}
5649 
5650 	attach_lkb(r, lkb);
5651 	add_lkb(r, lkb, rl->rl_status);
5652 	ls->ls_recover_locks_in++;
5653 
5654 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5655 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5656 
5657  out_remid:
5658 	/* this is the new value returned to the lock holder for
5659 	   saving in its process-copy lkb */
5660 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5661 
5662 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5663 
5664  out_unlock:
5665 	unlock_rsb(r);
5666 	put_rsb(r);
5667  out:
5668 	if (error && error != -EEXIST)
5669 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5670 			  from_nodeid, remid, error);
5671 	*rl_result = cpu_to_le32(error);
5672 	return error;
5673 }
5674 
5675 /* needs at least dlm_rcom + rcom_lock */
5676 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5677 			     uint64_t seq)
5678 {
5679 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5680 	struct dlm_rsb *r;
5681 	struct dlm_lkb *lkb;
5682 	uint32_t lkid, remid;
5683 	int error, result;
5684 
5685 	lkid = le32_to_cpu(rl->rl_lkid);
5686 	remid = le32_to_cpu(rl->rl_remid);
5687 	result = le32_to_cpu(rl->rl_result);
5688 
5689 	error = find_lkb(ls, lkid, &lkb);
5690 	if (error) {
5691 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5692 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5693 			  result);
5694 		return error;
5695 	}
5696 
5697 	r = lkb->lkb_resource;
5698 	hold_rsb(r);
5699 	lock_rsb(r);
5700 
5701 	if (!is_process_copy(lkb)) {
5702 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5703 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5704 			  result);
5705 		dlm_dump_rsb(r);
5706 		unlock_rsb(r);
5707 		put_rsb(r);
5708 		dlm_put_lkb(lkb);
5709 		return -EINVAL;
5710 	}
5711 
5712 	switch (result) {
5713 	case -EBADR:
5714 		/* There's a chance the new master received our lock before
5715 		   dlm_recover_master_reply(), this wouldn't happen if we did
5716 		   a barrier between recover_masters and recover_locks. */
5717 
5718 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5719 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5720 			  result);
5721 
5722 		dlm_send_rcom_lock(r, lkb, seq);
5723 		goto out;
5724 	case -EEXIST:
5725 	case 0:
5726 		lkb->lkb_remid = remid;
5727 		break;
5728 	default:
5729 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5730 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5731 			  result);
5732 	}
5733 
5734 	/* an ack for dlm_recover_locks() which waits for replies from
5735 	   all the locks it sends to new masters */
5736 	dlm_recovered_lock(r);
5737  out:
5738 	unlock_rsb(r);
5739 	put_rsb(r);
5740 	dlm_put_lkb(lkb);
5741 
5742 	return 0;
5743 }
5744 
5745 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5746 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5747 {
5748 	struct dlm_lkb *lkb;
5749 	struct dlm_args args;
5750 	bool do_put = true;
5751 	int error;
5752 
5753 	dlm_lock_recovery(ls);
5754 
5755 	error = create_lkb(ls, &lkb);
5756 	if (error) {
5757 		kfree(ua);
5758 		goto out;
5759 	}
5760 
5761 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5762 
5763 	if (flags & DLM_LKF_VALBLK) {
5764 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5765 		if (!ua->lksb.sb_lvbptr) {
5766 			kfree(ua);
5767 			error = -ENOMEM;
5768 			goto out_put;
5769 		}
5770 	}
5771 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5772 			      fake_bastfn, &args);
5773 	if (error) {
5774 		kfree(ua->lksb.sb_lvbptr);
5775 		ua->lksb.sb_lvbptr = NULL;
5776 		kfree(ua);
5777 		goto out_put;
5778 	}
5779 
5780 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5781 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5782 	   lock and that lkb_astparam is the dlm_user_args structure. */
5783 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5784 	error = request_lock(ls, lkb, name, namelen, &args);
5785 
5786 	switch (error) {
5787 	case 0:
5788 		break;
5789 	case -EINPROGRESS:
5790 		error = 0;
5791 		break;
5792 	case -EAGAIN:
5793 		error = 0;
5794 		fallthrough;
5795 	default:
5796 		goto out_put;
5797 	}
5798 
5799 	/* add this new lkb to the per-process list of locks */
5800 	spin_lock_bh(&ua->proc->locks_spin);
5801 	hold_lkb(lkb);
5802 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5803 	spin_unlock_bh(&ua->proc->locks_spin);
5804 	do_put = false;
5805  out_put:
5806 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5807 	if (do_put)
5808 		__put_lkb(ls, lkb);
5809  out:
5810 	dlm_unlock_recovery(ls);
5811 	return error;
5812 }
5813 
5814 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5815 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5816 {
5817 	struct dlm_lkb *lkb;
5818 	struct dlm_args args;
5819 	struct dlm_user_args *ua;
5820 	int error;
5821 
5822 	dlm_lock_recovery(ls);
5823 
5824 	error = find_lkb(ls, lkid, &lkb);
5825 	if (error)
5826 		goto out;
5827 
5828 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5829 
5830 	/* user can change the params on its lock when it converts it, or
5831 	   add an lvb that didn't exist before */
5832 
5833 	ua = lkb->lkb_ua;
5834 
5835 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5836 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5837 		if (!ua->lksb.sb_lvbptr) {
5838 			error = -ENOMEM;
5839 			goto out_put;
5840 		}
5841 	}
5842 	if (lvb_in && ua->lksb.sb_lvbptr)
5843 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5844 
5845 	ua->xid = ua_tmp->xid;
5846 	ua->castparam = ua_tmp->castparam;
5847 	ua->castaddr = ua_tmp->castaddr;
5848 	ua->bastparam = ua_tmp->bastparam;
5849 	ua->bastaddr = ua_tmp->bastaddr;
5850 	ua->user_lksb = ua_tmp->user_lksb;
5851 
5852 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5853 			      fake_bastfn, &args);
5854 	if (error)
5855 		goto out_put;
5856 
5857 	error = convert_lock(ls, lkb, &args);
5858 
5859 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5860 		error = 0;
5861  out_put:
5862 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5863 	dlm_put_lkb(lkb);
5864  out:
5865 	dlm_unlock_recovery(ls);
5866 	kfree(ua_tmp);
5867 	return error;
5868 }
5869 
5870 /*
5871  * The caller asks for an orphan lock on a given resource with a given mode.
5872  * If a matching lock exists, it's moved to the owner's list of locks and
5873  * the lkid is returned.
5874  */
5875 
5876 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5877 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5878 		     uint32_t *lkid)
5879 {
5880 	struct dlm_lkb *lkb = NULL, *iter;
5881 	struct dlm_user_args *ua;
5882 	int found_other_mode = 0;
5883 	int rv = 0;
5884 
5885 	spin_lock_bh(&ls->ls_orphans_lock);
5886 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5887 		if (iter->lkb_resource->res_length != namelen)
5888 			continue;
5889 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5890 			continue;
5891 		if (iter->lkb_grmode != mode) {
5892 			found_other_mode = 1;
5893 			continue;
5894 		}
5895 
5896 		lkb = iter;
5897 		list_del_init(&iter->lkb_ownqueue);
5898 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5899 		*lkid = iter->lkb_id;
5900 		break;
5901 	}
5902 	spin_unlock_bh(&ls->ls_orphans_lock);
5903 
5904 	if (!lkb && found_other_mode) {
5905 		rv = -EAGAIN;
5906 		goto out;
5907 	}
5908 
5909 	if (!lkb) {
5910 		rv = -ENOENT;
5911 		goto out;
5912 	}
5913 
5914 	lkb->lkb_exflags = flags;
5915 	lkb->lkb_ownpid = (int) current->pid;
5916 
5917 	ua = lkb->lkb_ua;
5918 
5919 	ua->proc = ua_tmp->proc;
5920 	ua->xid = ua_tmp->xid;
5921 	ua->castparam = ua_tmp->castparam;
5922 	ua->castaddr = ua_tmp->castaddr;
5923 	ua->bastparam = ua_tmp->bastparam;
5924 	ua->bastaddr = ua_tmp->bastaddr;
5925 	ua->user_lksb = ua_tmp->user_lksb;
5926 
5927 	/*
5928 	 * The lkb reference from the ls_orphans list was not
5929 	 * removed above, and is now considered the reference
5930 	 * for the proc locks list.
5931 	 */
5932 
5933 	spin_lock_bh(&ua->proc->locks_spin);
5934 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5935 	spin_unlock_bh(&ua->proc->locks_spin);
5936  out:
5937 	kfree(ua_tmp);
5938 	return rv;
5939 }
5940 
5941 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5942 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5943 {
5944 	struct dlm_lkb *lkb;
5945 	struct dlm_args args;
5946 	struct dlm_user_args *ua;
5947 	int error;
5948 
5949 	dlm_lock_recovery(ls);
5950 
5951 	error = find_lkb(ls, lkid, &lkb);
5952 	if (error)
5953 		goto out;
5954 
5955 	trace_dlm_unlock_start(ls, lkb, flags);
5956 
5957 	ua = lkb->lkb_ua;
5958 
5959 	if (lvb_in && ua->lksb.sb_lvbptr)
5960 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5961 	if (ua_tmp->castparam)
5962 		ua->castparam = ua_tmp->castparam;
5963 	ua->user_lksb = ua_tmp->user_lksb;
5964 
5965 	error = set_unlock_args(flags, ua, &args);
5966 	if (error)
5967 		goto out_put;
5968 
5969 	error = unlock_lock(ls, lkb, &args);
5970 
5971 	if (error == -DLM_EUNLOCK)
5972 		error = 0;
5973 	/* from validate_unlock_args() */
5974 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5975 		error = 0;
5976 	if (error)
5977 		goto out_put;
5978 
5979 	spin_lock_bh(&ua->proc->locks_spin);
5980 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5981 	if (!list_empty(&lkb->lkb_ownqueue))
5982 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5983 	spin_unlock_bh(&ua->proc->locks_spin);
5984  out_put:
5985 	trace_dlm_unlock_end(ls, lkb, flags, error);
5986 	dlm_put_lkb(lkb);
5987  out:
5988 	dlm_unlock_recovery(ls);
5989 	kfree(ua_tmp);
5990 	return error;
5991 }
5992 
5993 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5994 		    uint32_t flags, uint32_t lkid)
5995 {
5996 	struct dlm_lkb *lkb;
5997 	struct dlm_args args;
5998 	struct dlm_user_args *ua;
5999 	int error;
6000 
6001 	dlm_lock_recovery(ls);
6002 
6003 	error = find_lkb(ls, lkid, &lkb);
6004 	if (error)
6005 		goto out;
6006 
6007 	trace_dlm_unlock_start(ls, lkb, flags);
6008 
6009 	ua = lkb->lkb_ua;
6010 	if (ua_tmp->castparam)
6011 		ua->castparam = ua_tmp->castparam;
6012 	ua->user_lksb = ua_tmp->user_lksb;
6013 
6014 	error = set_unlock_args(flags, ua, &args);
6015 	if (error)
6016 		goto out_put;
6017 
6018 	error = cancel_lock(ls, lkb, &args);
6019 
6020 	if (error == -DLM_ECANCEL)
6021 		error = 0;
6022 	/* from validate_unlock_args() */
6023 	if (error == -EBUSY)
6024 		error = 0;
6025  out_put:
6026 	trace_dlm_unlock_end(ls, lkb, flags, error);
6027 	dlm_put_lkb(lkb);
6028  out:
6029 	dlm_unlock_recovery(ls);
6030 	kfree(ua_tmp);
6031 	return error;
6032 }
6033 
6034 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6035 {
6036 	struct dlm_lkb *lkb;
6037 	struct dlm_args args;
6038 	struct dlm_user_args *ua;
6039 	struct dlm_rsb *r;
6040 	int error;
6041 
6042 	dlm_lock_recovery(ls);
6043 
6044 	error = find_lkb(ls, lkid, &lkb);
6045 	if (error)
6046 		goto out;
6047 
6048 	trace_dlm_unlock_start(ls, lkb, flags);
6049 
6050 	ua = lkb->lkb_ua;
6051 
6052 	error = set_unlock_args(flags, ua, &args);
6053 	if (error)
6054 		goto out_put;
6055 
6056 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6057 
6058 	r = lkb->lkb_resource;
6059 	hold_rsb(r);
6060 	lock_rsb(r);
6061 
6062 	error = validate_unlock_args(lkb, &args);
6063 	if (error)
6064 		goto out_r;
6065 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6066 
6067 	error = _cancel_lock(r, lkb);
6068  out_r:
6069 	unlock_rsb(r);
6070 	put_rsb(r);
6071 
6072 	if (error == -DLM_ECANCEL)
6073 		error = 0;
6074 	/* from validate_unlock_args() */
6075 	if (error == -EBUSY)
6076 		error = 0;
6077  out_put:
6078 	trace_dlm_unlock_end(ls, lkb, flags, error);
6079 	dlm_put_lkb(lkb);
6080  out:
6081 	dlm_unlock_recovery(ls);
6082 	return error;
6083 }
6084 
6085 /* lkb's that are removed from the waiters list by revert are just left on the
6086    orphans list with the granted orphan locks, to be freed by purge */
6087 
6088 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6089 {
6090 	struct dlm_args args;
6091 	int error;
6092 
6093 	hold_lkb(lkb); /* reference for the ls_orphans list */
6094 	spin_lock_bh(&ls->ls_orphans_lock);
6095 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6096 	spin_unlock_bh(&ls->ls_orphans_lock);
6097 
6098 	set_unlock_args(0, lkb->lkb_ua, &args);
6099 
6100 	error = cancel_lock(ls, lkb, &args);
6101 	if (error == -DLM_ECANCEL)
6102 		error = 0;
6103 	return error;
6104 }
6105 
6106 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6107    granted.  Regardless of what rsb queue the lock is on, it's removed and
6108    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6109    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6110 
6111 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6112 {
6113 	struct dlm_args args;
6114 	int error;
6115 
6116 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6117 			lkb->lkb_ua, &args);
6118 
6119 	error = unlock_lock(ls, lkb, &args);
6120 	if (error == -DLM_EUNLOCK)
6121 		error = 0;
6122 	return error;
6123 }
6124 
6125 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6126    (which does lock_rsb) due to deadlock with receiving a message that does
6127    lock_rsb followed by dlm_user_add_cb() */
6128 
6129 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6130 				     struct dlm_user_proc *proc)
6131 {
6132 	struct dlm_lkb *lkb = NULL;
6133 
6134 	spin_lock_bh(&ls->ls_clear_proc_locks);
6135 	if (list_empty(&proc->locks))
6136 		goto out;
6137 
6138 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6139 	list_del_init(&lkb->lkb_ownqueue);
6140 
6141 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6142 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6143 	else
6144 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6145  out:
6146 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6147 	return lkb;
6148 }
6149 
6150 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6151    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6152    which we clear here. */
6153 
6154 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6155    list, and no more device_writes should add lkb's to proc->locks list; so we
6156    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6157    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6158    them ourself. */
6159 
6160 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6161 {
6162 	struct dlm_callback *cb, *cb_safe;
6163 	struct dlm_lkb *lkb, *safe;
6164 
6165 	dlm_lock_recovery(ls);
6166 
6167 	while (1) {
6168 		lkb = del_proc_lock(ls, proc);
6169 		if (!lkb)
6170 			break;
6171 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6172 			orphan_proc_lock(ls, lkb);
6173 		else
6174 			unlock_proc_lock(ls, lkb);
6175 
6176 		/* this removes the reference for the proc->locks list
6177 		   added by dlm_user_request, it may result in the lkb
6178 		   being freed */
6179 
6180 		dlm_put_lkb(lkb);
6181 	}
6182 
6183 	spin_lock_bh(&ls->ls_clear_proc_locks);
6184 
6185 	/* in-progress unlocks */
6186 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6187 		list_del_init(&lkb->lkb_ownqueue);
6188 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6189 		dlm_put_lkb(lkb);
6190 	}
6191 
6192 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6193 		list_del(&cb->list);
6194 		dlm_free_cb(cb);
6195 	}
6196 
6197 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6198 	dlm_unlock_recovery(ls);
6199 }
6200 
6201 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6202 {
6203 	struct dlm_callback *cb, *cb_safe;
6204 	struct dlm_lkb *lkb, *safe;
6205 
6206 	while (1) {
6207 		lkb = NULL;
6208 		spin_lock_bh(&proc->locks_spin);
6209 		if (!list_empty(&proc->locks)) {
6210 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6211 					 lkb_ownqueue);
6212 			list_del_init(&lkb->lkb_ownqueue);
6213 		}
6214 		spin_unlock_bh(&proc->locks_spin);
6215 
6216 		if (!lkb)
6217 			break;
6218 
6219 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6220 		unlock_proc_lock(ls, lkb);
6221 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6222 	}
6223 
6224 	spin_lock_bh(&proc->locks_spin);
6225 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6226 		list_del_init(&lkb->lkb_ownqueue);
6227 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6228 		dlm_put_lkb(lkb);
6229 	}
6230 	spin_unlock_bh(&proc->locks_spin);
6231 
6232 	spin_lock_bh(&proc->asts_spin);
6233 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6234 		list_del(&cb->list);
6235 		dlm_free_cb(cb);
6236 	}
6237 	spin_unlock_bh(&proc->asts_spin);
6238 }
6239 
6240 /* pid of 0 means purge all orphans */
6241 
6242 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6243 {
6244 	struct dlm_lkb *lkb, *safe;
6245 
6246 	spin_lock_bh(&ls->ls_orphans_lock);
6247 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6248 		if (pid && lkb->lkb_ownpid != pid)
6249 			continue;
6250 		unlock_proc_lock(ls, lkb);
6251 		list_del_init(&lkb->lkb_ownqueue);
6252 		dlm_put_lkb(lkb);
6253 	}
6254 	spin_unlock_bh(&ls->ls_orphans_lock);
6255 }
6256 
6257 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6258 {
6259 	struct dlm_message *ms;
6260 	struct dlm_mhandle *mh;
6261 	int error;
6262 
6263 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6264 				DLM_MSG_PURGE, &ms, &mh);
6265 	if (error)
6266 		return error;
6267 	ms->m_nodeid = cpu_to_le32(nodeid);
6268 	ms->m_pid = cpu_to_le32(pid);
6269 
6270 	return send_message(mh, ms, NULL, 0);
6271 }
6272 
6273 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6274 		   int nodeid, int pid)
6275 {
6276 	int error = 0;
6277 
6278 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6279 		error = send_purge(ls, nodeid, pid);
6280 	} else {
6281 		dlm_lock_recovery(ls);
6282 		if (pid == current->pid)
6283 			purge_proc_locks(ls, proc);
6284 		else
6285 			do_purge(ls, nodeid, pid);
6286 		dlm_unlock_recovery(ls);
6287 	}
6288 	return error;
6289 }
6290 
6291 /* debug functionality */
6292 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6293 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6294 {
6295 	struct dlm_lksb *lksb;
6296 	struct dlm_lkb *lkb;
6297 	struct dlm_rsb *r;
6298 	int error;
6299 
6300 	/* we currently can't set a valid user lock */
6301 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6302 		return -EOPNOTSUPP;
6303 
6304 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6305 	if (!lksb)
6306 		return -ENOMEM;
6307 
6308 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6309 	if (error) {
6310 		kfree(lksb);
6311 		return error;
6312 	}
6313 
6314 	dlm_set_dflags_val(lkb, lkb_dflags);
6315 	lkb->lkb_nodeid = lkb_nodeid;
6316 	lkb->lkb_lksb = lksb;
6317 	/* user specific pointer, just don't have it NULL for kernel locks */
6318 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6319 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6320 
6321 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6322 	if (error) {
6323 		kfree(lksb);
6324 		__put_lkb(ls, lkb);
6325 		return error;
6326 	}
6327 
6328 	lock_rsb(r);
6329 	attach_lkb(r, lkb);
6330 	add_lkb(r, lkb, lkb_status);
6331 	unlock_rsb(r);
6332 	put_rsb(r);
6333 
6334 	return 0;
6335 }
6336 
6337 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6338 				 int mstype, int to_nodeid)
6339 {
6340 	struct dlm_lkb *lkb;
6341 	int error;
6342 
6343 	error = find_lkb(ls, lkb_id, &lkb);
6344 	if (error)
6345 		return error;
6346 
6347 	error = add_to_waiters(lkb, mstype, to_nodeid);
6348 	dlm_put_lkb(lkb);
6349 	return error;
6350 }
6351 
6352