xref: /linux/fs/dlm/lock.c (revision 79d2e1919a2728ef49d938eb20ebd5903c14dfb0)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 	if (error)
738 		goto do_new;
739 
740 	/* check if the rsb is active under read lock - likely path */
741 	read_lock_bh(&ls->ls_rsbtbl_lock);
742 	if (!rsb_flag(r, RSB_HASHED)) {
743 		read_unlock_bh(&ls->ls_rsbtbl_lock);
744 		goto do_new;
745 	}
746 
747 	/*
748 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
749 	 */
750 
751 	if (rsb_flag(r, RSB_INACTIVE)) {
752 		read_unlock_bh(&ls->ls_rsbtbl_lock);
753 		goto do_inactive;
754 	}
755 
756 	kref_get(&r->res_ref);
757 	read_unlock_bh(&ls->ls_rsbtbl_lock);
758 	goto out;
759 
760 
761  do_inactive:
762 	write_lock_bh(&ls->ls_rsbtbl_lock);
763 
764 	/*
765 	 * The expectation here is that the rsb will have HASHED and
766 	 * INACTIVE flags set, and that the rsb can be moved from
767 	 * inactive back to active again.  However, between releasing
768 	 * the read lock and acquiring the write lock, this rsb could
769 	 * have been removed from rsbtbl, and had HASHED cleared, to
770 	 * be freed.  To deal with this case, we would normally need
771 	 * to repeat dlm_search_rsb_tree while holding the write lock,
772 	 * but rcu allows us to simply check the HASHED flag, because
773 	 * the rcu read lock means the rsb will not be freed yet.
774 	 * If the HASHED flag is not set, then the rsb is being freed,
775 	 * so we add a new rsb struct.  If the HASHED flag is set,
776 	 * and INACTIVE is not set, it means another thread has
777 	 * made the rsb active, as we're expecting to do here, and
778 	 * we just repeat the lookup (this will be very unlikely.)
779 	 */
780 	if (rsb_flag(r, RSB_HASHED)) {
781 		if (!rsb_flag(r, RSB_INACTIVE)) {
782 			write_unlock_bh(&ls->ls_rsbtbl_lock);
783 			goto retry;
784 		}
785 	} else {
786 		write_unlock_bh(&ls->ls_rsbtbl_lock);
787 		goto do_new;
788 	}
789 
790 	/*
791 	 * rsb found inactive (master_nodeid may be out of date unless
792 	 * we are the dir_nodeid or were the master)  No other thread
793 	 * is using this rsb because it's inactive, so we can
794 	 * look at or update res_master_nodeid without lock_rsb.
795 	 */
796 
797 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
798 		/* our rsb was not master, and another node (not the dir node)
799 		   has sent us a request */
800 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
801 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
802 			  r->res_name);
803 		write_unlock_bh(&ls->ls_rsbtbl_lock);
804 		error = -ENOTBLK;
805 		goto out;
806 	}
807 
808 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
809 		/* don't think this should ever happen */
810 		log_error(ls, "find_rsb inactive from_dir %d master %d",
811 			  from_nodeid, r->res_master_nodeid);
812 		dlm_print_rsb(r);
813 		/* fix it and go on */
814 		r->res_master_nodeid = our_nodeid;
815 		r->res_nodeid = 0;
816 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
817 		r->res_first_lkid = 0;
818 	}
819 
820 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
821 		/* Because we have held no locks on this rsb,
822 		   res_master_nodeid could have become stale. */
823 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
824 		r->res_first_lkid = 0;
825 	}
826 
827 	/* A dir record will not be on the scan list. */
828 	if (r->res_dir_nodeid != our_nodeid)
829 		del_scan(ls, r);
830 	list_move(&r->res_slow_list, &ls->ls_slow_active);
831 	rsb_clear_flag(r, RSB_INACTIVE);
832 	kref_init(&r->res_ref); /* ref is now used in active state */
833 	write_unlock_bh(&ls->ls_rsbtbl_lock);
834 
835 	goto out;
836 
837 
838  do_new:
839 	/*
840 	 * rsb not found
841 	 */
842 
843 	if (error == -EBADR && !create)
844 		goto out;
845 
846 	error = get_rsb_struct(ls, name, len, &r);
847 	if (WARN_ON_ONCE(error))
848 		goto out;
849 
850 	r->res_hash = hash;
851 	r->res_dir_nodeid = dir_nodeid;
852 	kref_init(&r->res_ref);
853 
854 	if (from_dir) {
855 		/* want to see how often this happens */
856 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
857 			  from_nodeid, r->res_name);
858 		r->res_master_nodeid = our_nodeid;
859 		r->res_nodeid = 0;
860 		goto out_add;
861 	}
862 
863 	if (from_other && (dir_nodeid != our_nodeid)) {
864 		/* should never happen */
865 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
866 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
867 		dlm_free_rsb(r);
868 		r = NULL;
869 		error = -ENOTBLK;
870 		goto out;
871 	}
872 
873 	if (from_other) {
874 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
875 			  from_nodeid, dir_nodeid, r->res_name);
876 	}
877 
878 	if (dir_nodeid == our_nodeid) {
879 		/* When we are the dir nodeid, we can set the master
880 		   node immediately */
881 		r->res_master_nodeid = our_nodeid;
882 		r->res_nodeid = 0;
883 	} else {
884 		/* set_master will send_lookup to dir_nodeid */
885 		r->res_master_nodeid = 0;
886 		r->res_nodeid = -1;
887 	}
888 
889  out_add:
890 
891 	write_lock_bh(&ls->ls_rsbtbl_lock);
892 	error = rsb_insert(r, &ls->ls_rsbtbl);
893 	if (error == -EEXIST) {
894 		/* somebody else was faster and it seems the
895 		 * rsb exists now, we do a whole relookup
896 		 */
897 		write_unlock_bh(&ls->ls_rsbtbl_lock);
898 		dlm_free_rsb(r);
899 		goto retry;
900 	} else if (!error) {
901 		list_add(&r->res_slow_list, &ls->ls_slow_active);
902 	}
903 	write_unlock_bh(&ls->ls_rsbtbl_lock);
904  out:
905 	*r_ret = r;
906 	return error;
907 }
908 
909 /* During recovery, other nodes can send us new MSTCPY locks (from
910    dlm_recover_locks) before we've made ourself master (in
911    dlm_recover_masters). */
912 
913 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
914 			  uint32_t hash, int dir_nodeid, int from_nodeid,
915 			  unsigned int flags, struct dlm_rsb **r_ret)
916 {
917 	struct dlm_rsb *r = NULL;
918 	int our_nodeid = dlm_our_nodeid();
919 	int recover = (flags & R_RECEIVE_RECOVER);
920 	int error;
921 
922  retry:
923 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
924 	if (error)
925 		goto do_new;
926 
927 	/* check if the rsb is in active state under read lock - likely path */
928 	read_lock_bh(&ls->ls_rsbtbl_lock);
929 	if (!rsb_flag(r, RSB_HASHED)) {
930 		read_unlock_bh(&ls->ls_rsbtbl_lock);
931 		goto do_new;
932 	}
933 
934 	if (rsb_flag(r, RSB_INACTIVE)) {
935 		read_unlock_bh(&ls->ls_rsbtbl_lock);
936 		goto do_inactive;
937 	}
938 
939 	/*
940 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
941 	 */
942 
943 	kref_get(&r->res_ref);
944 	read_unlock_bh(&ls->ls_rsbtbl_lock);
945 
946 	goto out;
947 
948 
949  do_inactive:
950 	write_lock_bh(&ls->ls_rsbtbl_lock);
951 
952 	/* See comment in find_rsb_dir. */
953 	if (rsb_flag(r, RSB_HASHED)) {
954 		if (!rsb_flag(r, RSB_INACTIVE)) {
955 			write_unlock_bh(&ls->ls_rsbtbl_lock);
956 			goto retry;
957 		}
958 	} else {
959 		write_unlock_bh(&ls->ls_rsbtbl_lock);
960 		goto do_new;
961 	}
962 
963 
964 	/*
965 	 * rsb found inactive. No other thread is using this rsb because
966 	 * it's inactive, so we can look at or update res_master_nodeid
967 	 * without lock_rsb.
968 	 */
969 
970 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
971 		/* our rsb is not master, and another node has sent us a
972 		   request; this should never happen */
973 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
974 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
975 		dlm_print_rsb(r);
976 		write_unlock_bh(&ls->ls_rsbtbl_lock);
977 		error = -ENOTBLK;
978 		goto out;
979 	}
980 
981 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
982 	    (dir_nodeid == our_nodeid)) {
983 		/* our rsb is not master, and we are dir; may as well fix it;
984 		   this should never happen */
985 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
986 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
987 		dlm_print_rsb(r);
988 		r->res_master_nodeid = our_nodeid;
989 		r->res_nodeid = 0;
990 	}
991 
992 	list_move(&r->res_slow_list, &ls->ls_slow_active);
993 	rsb_clear_flag(r, RSB_INACTIVE);
994 	kref_init(&r->res_ref);
995 	del_scan(ls, r);
996 	write_unlock_bh(&ls->ls_rsbtbl_lock);
997 
998 	goto out;
999 
1000 
1001  do_new:
1002 	/*
1003 	 * rsb not found
1004 	 */
1005 
1006 	error = get_rsb_struct(ls, name, len, &r);
1007 	if (WARN_ON_ONCE(error))
1008 		goto out;
1009 
1010 	r->res_hash = hash;
1011 	r->res_dir_nodeid = dir_nodeid;
1012 	r->res_master_nodeid = dir_nodeid;
1013 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1014 	kref_init(&r->res_ref);
1015 
1016 	write_lock_bh(&ls->ls_rsbtbl_lock);
1017 	error = rsb_insert(r, &ls->ls_rsbtbl);
1018 	if (error == -EEXIST) {
1019 		/* somebody else was faster and it seems the
1020 		 * rsb exists now, we do a whole relookup
1021 		 */
1022 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1023 		dlm_free_rsb(r);
1024 		goto retry;
1025 	} else if (!error) {
1026 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1027 	}
1028 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1029 
1030  out:
1031 	*r_ret = r;
1032 	return error;
1033 }
1034 
1035 /*
1036  * rsb rcu usage
1037  *
1038  * While rcu read lock is held, the rsb cannot be freed,
1039  * which allows a lookup optimization.
1040  *
1041  * Two threads are accessing the same rsb concurrently,
1042  * the first (A) is trying to use the rsb, the second (B)
1043  * is trying to free the rsb.
1044  *
1045  * thread A                 thread B
1046  * (trying to use rsb)      (trying to free rsb)
1047  *
1048  * A1. rcu read lock
1049  * A2. rsbtbl read lock
1050  * A3. look up rsb in rsbtbl
1051  * A4. rsbtbl read unlock
1052  *                          B1. rsbtbl write lock
1053  *                          B2. look up rsb in rsbtbl
1054  *                          B3. remove rsb from rsbtbl
1055  *                          B4. clear rsb HASHED flag
1056  *                          B5. rsbtbl write unlock
1057  *                          B6. begin freeing rsb using rcu...
1058  *
1059  * (rsb is inactive, so try to make it active again)
1060  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1061  * A6. the rsb HASHED flag is not set, which it means the rsb
1062  *     is being removed from rsbtbl and freed, so don't use it.
1063  * A7. rcu read unlock
1064  *
1065  *                          B7. ...finish freeing rsb using rcu
1066  * A8. create a new rsb
1067  *
1068  * Without the rcu optimization, steps A5-8 would need to do
1069  * an extra rsbtbl lookup:
1070  * A5. rsbtbl write lock
1071  * A6. look up rsb in rsbtbl, not found
1072  * A7. rsbtbl write unlock
1073  * A8. create a new rsb
1074  */
1075 
1076 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1077 		    int from_nodeid, unsigned int flags,
1078 		    struct dlm_rsb **r_ret)
1079 {
1080 	int dir_nodeid;
1081 	uint32_t hash;
1082 	int rv;
1083 
1084 	if (len > DLM_RESNAME_MAXLEN)
1085 		return -EINVAL;
1086 
1087 	hash = jhash(name, len, 0);
1088 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1089 
1090 	rcu_read_lock();
1091 	if (dlm_no_directory(ls))
1092 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1093 				      from_nodeid, flags, r_ret);
1094 	else
1095 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1096 				    from_nodeid, flags, r_ret);
1097 	rcu_read_unlock();
1098 	return rv;
1099 }
1100 
1101 /* we have received a request and found that res_master_nodeid != our_nodeid,
1102    so we need to return an error or make ourself the master */
1103 
1104 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1105 				  int from_nodeid)
1106 {
1107 	if (dlm_no_directory(ls)) {
1108 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1109 			  from_nodeid, r->res_master_nodeid,
1110 			  r->res_dir_nodeid);
1111 		dlm_print_rsb(r);
1112 		return -ENOTBLK;
1113 	}
1114 
1115 	if (from_nodeid != r->res_dir_nodeid) {
1116 		/* our rsb is not master, and another node (not the dir node)
1117 	   	   has sent us a request.  this is much more common when our
1118 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1119 
1120 		if (r->res_master_nodeid) {
1121 			log_debug(ls, "validate master from_other %d master %d "
1122 				  "dir %d first %x %s", from_nodeid,
1123 				  r->res_master_nodeid, r->res_dir_nodeid,
1124 				  r->res_first_lkid, r->res_name);
1125 		}
1126 		return -ENOTBLK;
1127 	} else {
1128 		/* our rsb is not master, but the dir nodeid has sent us a
1129 	   	   request; this could happen with master 0 / res_nodeid -1 */
1130 
1131 		if (r->res_master_nodeid) {
1132 			log_error(ls, "validate master from_dir %d master %d "
1133 				  "first %x %s",
1134 				  from_nodeid, r->res_master_nodeid,
1135 				  r->res_first_lkid, r->res_name);
1136 		}
1137 
1138 		r->res_master_nodeid = dlm_our_nodeid();
1139 		r->res_nodeid = 0;
1140 		return 0;
1141 	}
1142 }
1143 
1144 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1145 				int from_nodeid, bool is_inactive, unsigned int flags,
1146 				int *r_nodeid, int *result)
1147 {
1148 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1149 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1150 
1151 	if (r->res_dir_nodeid != our_nodeid) {
1152 		/* should not happen, but may as well fix it and carry on */
1153 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1154 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1155 		r->res_dir_nodeid = our_nodeid;
1156 	}
1157 
1158 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1159 		/* Recovery uses this function to set a new master when
1160 		 * the previous master failed.  Setting NEW_MASTER will
1161 		 * force dlm_recover_masters to call recover_master on this
1162 		 * rsb even though the res_nodeid is no longer removed.
1163 		 */
1164 
1165 		r->res_master_nodeid = from_nodeid;
1166 		r->res_nodeid = from_nodeid;
1167 		rsb_set_flag(r, RSB_NEW_MASTER);
1168 
1169 		if (is_inactive) {
1170 			/* I don't think we should ever find it inactive. */
1171 			log_error(ls, "%s fix_master inactive", __func__);
1172 			dlm_dump_rsb(r);
1173 		}
1174 	}
1175 
1176 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1177 		/* this will happen if from_nodeid became master during
1178 		 * a previous recovery cycle, and we aborted the previous
1179 		 * cycle before recovering this master value
1180 		 */
1181 
1182 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1183 			  __func__, from_nodeid, r->res_master_nodeid,
1184 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1185 
1186 		if (r->res_master_nodeid == our_nodeid) {
1187 			log_error(ls, "from_master %d our_master", from_nodeid);
1188 			dlm_dump_rsb(r);
1189 			goto ret_assign;
1190 		}
1191 
1192 		r->res_master_nodeid = from_nodeid;
1193 		r->res_nodeid = from_nodeid;
1194 		rsb_set_flag(r, RSB_NEW_MASTER);
1195 	}
1196 
1197 	if (!r->res_master_nodeid) {
1198 		/* this will happen if recovery happens while we're looking
1199 		 * up the master for this rsb
1200 		 */
1201 
1202 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1203 			  from_nodeid, r->res_first_lkid, r->res_name);
1204 		r->res_master_nodeid = from_nodeid;
1205 		r->res_nodeid = from_nodeid;
1206 	}
1207 
1208 	if (!from_master && !fix_master &&
1209 	    (r->res_master_nodeid == from_nodeid)) {
1210 		/* this can happen when the master sends remove, the dir node
1211 		 * finds the rsb on the active list and ignores the remove,
1212 		 * and the former master sends a lookup
1213 		 */
1214 
1215 		log_limit(ls, "%s from master %d flags %x first %x %s",
1216 			  __func__, from_nodeid, flags, r->res_first_lkid,
1217 			  r->res_name);
1218 	}
1219 
1220  ret_assign:
1221 	*r_nodeid = r->res_master_nodeid;
1222 	if (result)
1223 		*result = DLM_LU_MATCH;
1224 }
1225 
1226 /*
1227  * We're the dir node for this res and another node wants to know the
1228  * master nodeid.  During normal operation (non recovery) this is only
1229  * called from receive_lookup(); master lookups when the local node is
1230  * the dir node are done by find_rsb().
1231  *
1232  * normal operation, we are the dir node for a resource
1233  * . _request_lock
1234  * . set_master
1235  * . send_lookup
1236  * . receive_lookup
1237  * . dlm_master_lookup flags 0
1238  *
1239  * recover directory, we are rebuilding dir for all resources
1240  * . dlm_recover_directory
1241  * . dlm_rcom_names
1242  *   remote node sends back the rsb names it is master of and we are dir of
1243  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1244  *   we either create new rsb setting remote node as master, or find existing
1245  *   rsb and set master to be the remote node.
1246  *
1247  * recover masters, we are finding the new master for resources
1248  * . dlm_recover_masters
1249  * . recover_master
1250  * . dlm_send_rcom_lookup
1251  * . receive_rcom_lookup
1252  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1253  */
1254 
1255 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1256 			      int len, unsigned int flags, int *r_nodeid, int *result)
1257 {
1258 	struct dlm_rsb *r = NULL;
1259 	uint32_t hash;
1260 	int our_nodeid = dlm_our_nodeid();
1261 	int dir_nodeid, error;
1262 
1263 	if (len > DLM_RESNAME_MAXLEN)
1264 		return -EINVAL;
1265 
1266 	if (from_nodeid == our_nodeid) {
1267 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1268 			  our_nodeid, flags);
1269 		return -EINVAL;
1270 	}
1271 
1272 	hash = jhash(name, len, 0);
1273 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1274 	if (dir_nodeid != our_nodeid) {
1275 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1276 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1277 			  ls->ls_num_nodes);
1278 		*r_nodeid = -1;
1279 		return -EINVAL;
1280 	}
1281 
1282  retry:
1283 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1284 	if (error)
1285 		goto not_found;
1286 
1287 	/* check if the rsb is active under read lock - likely path */
1288 	read_lock_bh(&ls->ls_rsbtbl_lock);
1289 	if (!rsb_flag(r, RSB_HASHED)) {
1290 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1291 		goto not_found;
1292 	}
1293 
1294 	if (rsb_flag(r, RSB_INACTIVE)) {
1295 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 		goto do_inactive;
1297 	}
1298 
1299 	/* because the rsb is active, we need to lock_rsb before
1300 	 * checking/changing re_master_nodeid
1301 	 */
1302 
1303 	hold_rsb(r);
1304 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1305 	lock_rsb(r);
1306 
1307 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1308 			    flags, r_nodeid, result);
1309 
1310 	/* the rsb was active */
1311 	unlock_rsb(r);
1312 	put_rsb(r);
1313 
1314 	return 0;
1315 
1316  do_inactive:
1317 	/* unlikely path - check if still part of ls_rsbtbl */
1318 	write_lock_bh(&ls->ls_rsbtbl_lock);
1319 
1320 	/* see comment in find_rsb_dir */
1321 	if (rsb_flag(r, RSB_HASHED)) {
1322 		if (!rsb_flag(r, RSB_INACTIVE)) {
1323 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1324 			/* something as changed, very unlikely but
1325 			 * try again
1326 			 */
1327 			goto retry;
1328 		}
1329 	} else {
1330 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1331 		goto not_found;
1332 	}
1333 
1334 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1335 	   is not used, but is protected by the rsbtbl lock */
1336 
1337 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1338 			    r_nodeid, result);
1339 
1340 	/* A dir record rsb should never be on scan list. */
1341 	/* Try to fix this with del_scan? */
1342 	WARN_ON(!list_empty(&r->res_scan_list));
1343 
1344 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1345 
1346 	return 0;
1347 
1348  not_found:
1349 	error = get_rsb_struct(ls, name, len, &r);
1350 	if (WARN_ON_ONCE(error))
1351 		goto out;
1352 
1353 	r->res_hash = hash;
1354 	r->res_dir_nodeid = our_nodeid;
1355 	r->res_master_nodeid = from_nodeid;
1356 	r->res_nodeid = from_nodeid;
1357 	rsb_set_flag(r, RSB_INACTIVE);
1358 
1359 	write_lock_bh(&ls->ls_rsbtbl_lock);
1360 	error = rsb_insert(r, &ls->ls_rsbtbl);
1361 	if (error == -EEXIST) {
1362 		/* somebody else was faster and it seems the
1363 		 * rsb exists now, we do a whole relookup
1364 		 */
1365 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1366 		dlm_free_rsb(r);
1367 		goto retry;
1368 	} else if (error) {
1369 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1370 		/* should never happen */
1371 		dlm_free_rsb(r);
1372 		goto retry;
1373 	}
1374 
1375 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1376 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1377 
1378 	if (result)
1379 		*result = DLM_LU_ADD;
1380 	*r_nodeid = from_nodeid;
1381  out:
1382 	return error;
1383 }
1384 
1385 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1386 		      int len, unsigned int flags, int *r_nodeid, int *result)
1387 {
1388 	int rv;
1389 	rcu_read_lock();
1390 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1391 	rcu_read_unlock();
1392 	return rv;
1393 }
1394 
1395 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1396 {
1397 	struct dlm_rsb *r;
1398 
1399 	read_lock_bh(&ls->ls_rsbtbl_lock);
1400 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1401 		if (r->res_hash == hash)
1402 			dlm_dump_rsb(r);
1403 	}
1404 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1405 }
1406 
1407 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1408 {
1409 	struct dlm_rsb *r = NULL;
1410 	int error;
1411 
1412 	rcu_read_lock();
1413 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1414 	if (!error)
1415 		goto out;
1416 
1417 	dlm_dump_rsb(r);
1418  out:
1419 	rcu_read_unlock();
1420 }
1421 
1422 static void deactivate_rsb(struct kref *kref)
1423 {
1424 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1425 	struct dlm_ls *ls = r->res_ls;
1426 	int our_nodeid = dlm_our_nodeid();
1427 
1428 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1429 	rsb_set_flag(r, RSB_INACTIVE);
1430 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1431 
1432 	/*
1433 	 * When the rsb becomes unused:
1434 	 * - If it's not a dir record for a remote master rsb,
1435 	 *   then it is put on the scan list to be freed.
1436 	 * - If it's a dir record for a remote master rsb,
1437 	 *   then it is kept in the inactive state until
1438 	 *   receive_remove() from the master node.
1439 	 */
1440 	if (!dlm_no_directory(ls) &&
1441 	    (r->res_master_nodeid != our_nodeid) &&
1442 	    (dlm_dir_nodeid(r) != our_nodeid))
1443 		add_scan(ls, r);
1444 
1445 	if (r->res_lvbptr) {
1446 		dlm_free_lvb(r->res_lvbptr);
1447 		r->res_lvbptr = NULL;
1448 	}
1449 }
1450 
1451 void free_inactive_rsb(struct dlm_rsb *r)
1452 {
1453 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1454 
1455 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1456 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1457 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1458 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1459 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1460 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1461 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1462 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1463 
1464 	dlm_free_rsb(r);
1465 }
1466 
1467 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1468    The rsb must exist as long as any lkb's for it do. */
1469 
1470 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472 	hold_rsb(r);
1473 	lkb->lkb_resource = r;
1474 }
1475 
1476 static void detach_lkb(struct dlm_lkb *lkb)
1477 {
1478 	if (lkb->lkb_resource) {
1479 		put_rsb(lkb->lkb_resource);
1480 		lkb->lkb_resource = NULL;
1481 	}
1482 }
1483 
1484 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1485 		       unsigned long start, unsigned long end)
1486 {
1487 	struct xa_limit limit;
1488 	struct dlm_lkb *lkb;
1489 	int rv;
1490 
1491 	limit.max = end;
1492 	limit.min = start;
1493 
1494 	lkb = dlm_allocate_lkb();
1495 	if (!lkb)
1496 		return -ENOMEM;
1497 
1498 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1499 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1500 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1501 	lkb->lkb_nodeid = -1;
1502 	lkb->lkb_grmode = DLM_LOCK_IV;
1503 	kref_init(&lkb->lkb_ref);
1504 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1505 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1506 
1507 	write_lock_bh(&ls->ls_lkbxa_lock);
1508 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1509 	write_unlock_bh(&ls->ls_lkbxa_lock);
1510 
1511 	if (rv < 0) {
1512 		log_error(ls, "create_lkb xa error %d", rv);
1513 		dlm_free_lkb(lkb);
1514 		return rv;
1515 	}
1516 
1517 	*lkb_ret = lkb;
1518 	return 0;
1519 }
1520 
1521 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1522 {
1523 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1524 }
1525 
1526 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1527 {
1528 	struct dlm_lkb *lkb;
1529 
1530 	rcu_read_lock();
1531 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1532 	if (lkb) {
1533 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1534 		 * the lkb_ref is tight to the lkbxa data structure, see
1535 		 * __put_lkb().
1536 		 */
1537 		read_lock_bh(&ls->ls_lkbxa_lock);
1538 		if (kref_read(&lkb->lkb_ref))
1539 			kref_get(&lkb->lkb_ref);
1540 		else
1541 			lkb = NULL;
1542 		read_unlock_bh(&ls->ls_lkbxa_lock);
1543 	}
1544 	rcu_read_unlock();
1545 
1546 	*lkb_ret = lkb;
1547 	return lkb ? 0 : -ENOENT;
1548 }
1549 
1550 static void kill_lkb(struct kref *kref)
1551 {
1552 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1553 
1554 	/* All work is done after the return from kref_put() so we
1555 	   can release the write_lock before the detach_lkb */
1556 
1557 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1558 }
1559 
1560 /* __put_lkb() is used when an lkb may not have an rsb attached to
1561    it so we need to provide the lockspace explicitly */
1562 
1563 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1564 {
1565 	uint32_t lkid = lkb->lkb_id;
1566 	int rv;
1567 
1568 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1569 					&ls->ls_lkbxa_lock);
1570 	if (rv) {
1571 		xa_erase(&ls->ls_lkbxa, lkid);
1572 		write_unlock_bh(&ls->ls_lkbxa_lock);
1573 
1574 		detach_lkb(lkb);
1575 
1576 		/* for local/process lkbs, lvbptr points to caller's lksb */
1577 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1578 			dlm_free_lvb(lkb->lkb_lvbptr);
1579 		dlm_free_lkb(lkb);
1580 	}
1581 
1582 	return rv;
1583 }
1584 
1585 int dlm_put_lkb(struct dlm_lkb *lkb)
1586 {
1587 	struct dlm_ls *ls;
1588 
1589 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1590 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1591 
1592 	ls = lkb->lkb_resource->res_ls;
1593 	return __put_lkb(ls, lkb);
1594 }
1595 
1596 /* This is only called to add a reference when the code already holds
1597    a valid reference to the lkb, so there's no need for locking. */
1598 
1599 static inline void hold_lkb(struct dlm_lkb *lkb)
1600 {
1601 	kref_get(&lkb->lkb_ref);
1602 }
1603 
1604 static void unhold_lkb_assert(struct kref *kref)
1605 {
1606 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1607 
1608 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1609 }
1610 
1611 /* This is called when we need to remove a reference and are certain
1612    it's not the last ref.  e.g. del_lkb is always called between a
1613    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1614    put_lkb would work fine, but would involve unnecessary locking */
1615 
1616 static inline void unhold_lkb(struct dlm_lkb *lkb)
1617 {
1618 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1619 }
1620 
1621 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1622 			    int mode)
1623 {
1624 	struct dlm_lkb *lkb = NULL, *iter;
1625 
1626 	list_for_each_entry(iter, head, lkb_statequeue)
1627 		if (iter->lkb_rqmode < mode) {
1628 			lkb = iter;
1629 			list_add_tail(new, &iter->lkb_statequeue);
1630 			break;
1631 		}
1632 
1633 	if (!lkb)
1634 		list_add_tail(new, head);
1635 }
1636 
1637 /* add/remove lkb to rsb's grant/convert/wait queue */
1638 
1639 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1640 {
1641 	kref_get(&lkb->lkb_ref);
1642 
1643 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1644 
1645 	lkb->lkb_timestamp = ktime_get();
1646 
1647 	lkb->lkb_status = status;
1648 
1649 	switch (status) {
1650 	case DLM_LKSTS_WAITING:
1651 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1652 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1653 		else
1654 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1655 		break;
1656 	case DLM_LKSTS_GRANTED:
1657 		/* convention says granted locks kept in order of grmode */
1658 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1659 				lkb->lkb_grmode);
1660 		break;
1661 	case DLM_LKSTS_CONVERT:
1662 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1663 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1664 		else
1665 			list_add_tail(&lkb->lkb_statequeue,
1666 				      &r->res_convertqueue);
1667 		break;
1668 	default:
1669 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1670 	}
1671 }
1672 
1673 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1674 {
1675 	lkb->lkb_status = 0;
1676 	list_del(&lkb->lkb_statequeue);
1677 	unhold_lkb(lkb);
1678 }
1679 
1680 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1681 {
1682 	del_lkb(r, lkb);
1683 	add_lkb(r, lkb, sts);
1684 }
1685 
1686 static int msg_reply_type(int mstype)
1687 {
1688 	switch (mstype) {
1689 	case DLM_MSG_REQUEST:
1690 		return DLM_MSG_REQUEST_REPLY;
1691 	case DLM_MSG_CONVERT:
1692 		return DLM_MSG_CONVERT_REPLY;
1693 	case DLM_MSG_UNLOCK:
1694 		return DLM_MSG_UNLOCK_REPLY;
1695 	case DLM_MSG_CANCEL:
1696 		return DLM_MSG_CANCEL_REPLY;
1697 	case DLM_MSG_LOOKUP:
1698 		return DLM_MSG_LOOKUP_REPLY;
1699 	}
1700 	return -1;
1701 }
1702 
1703 /* add/remove lkb from global waiters list of lkb's waiting for
1704    a reply from a remote node */
1705 
1706 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1707 {
1708 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1709 
1710 	spin_lock_bh(&ls->ls_waiters_lock);
1711 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1712 		switch (mstype) {
1713 		case DLM_MSG_UNLOCK:
1714 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1715 			break;
1716 		case DLM_MSG_CANCEL:
1717 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1718 			break;
1719 		default:
1720 			/* should never happen as validate_lock_args() checks
1721 			 * on lkb_wait_type and validate_unlock_args() only
1722 			 * creates UNLOCK or CANCEL messages.
1723 			 */
1724 			WARN_ON_ONCE(1);
1725 			goto out;
1726 		}
1727 		lkb->lkb_wait_count++;
1728 		hold_lkb(lkb);
1729 
1730 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1731 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1732 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1733 		goto out;
1734 	}
1735 
1736 	DLM_ASSERT(!lkb->lkb_wait_count,
1737 		   dlm_print_lkb(lkb);
1738 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1739 
1740 	lkb->lkb_wait_count++;
1741 	lkb->lkb_wait_type = mstype;
1742 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1743 	hold_lkb(lkb);
1744 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1745  out:
1746 	spin_unlock_bh(&ls->ls_waiters_lock);
1747 }
1748 
1749 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1750    list as part of process_requestqueue (e.g. a lookup that has an optimized
1751    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1752    set RESEND and dlm_recover_waiters_post() */
1753 
1754 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1755 				const struct dlm_message *ms)
1756 {
1757 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1758 	int overlap_done = 0;
1759 
1760 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1761 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1762 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1763 		overlap_done = 1;
1764 		goto out_del;
1765 	}
1766 
1767 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1768 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1769 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1770 		overlap_done = 1;
1771 		goto out_del;
1772 	}
1773 
1774 	/* Cancel state was preemptively cleared by a successful convert,
1775 	   see next comment, nothing to do. */
1776 
1777 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1778 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1779 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1780 			  lkb->lkb_id, lkb->lkb_wait_type);
1781 		return -1;
1782 	}
1783 
1784 	/* Remove for the convert reply, and premptively remove for the
1785 	   cancel reply.  A convert has been granted while there's still
1786 	   an outstanding cancel on it (the cancel is moot and the result
1787 	   in the cancel reply should be 0).  We preempt the cancel reply
1788 	   because the app gets the convert result and then can follow up
1789 	   with another op, like convert.  This subsequent op would see the
1790 	   lingering state of the cancel and fail with -EBUSY. */
1791 
1792 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1793 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1794 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1795 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1796 			  lkb->lkb_id);
1797 		lkb->lkb_wait_type = 0;
1798 		lkb->lkb_wait_count--;
1799 		unhold_lkb(lkb);
1800 		goto out_del;
1801 	}
1802 
1803 	/* N.B. type of reply may not always correspond to type of original
1804 	   msg due to lookup->request optimization, verify others? */
1805 
1806 	if (lkb->lkb_wait_type) {
1807 		lkb->lkb_wait_type = 0;
1808 		goto out_del;
1809 	}
1810 
1811 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1812 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1813 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1814 	return -1;
1815 
1816  out_del:
1817 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1818 	   to the op that was in progress prior to the unlock/cancel; we
1819 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1820 	   this would happen */
1821 
1822 	if (overlap_done && lkb->lkb_wait_type) {
1823 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1824 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1825 		lkb->lkb_wait_count--;
1826 		unhold_lkb(lkb);
1827 		lkb->lkb_wait_type = 0;
1828 	}
1829 
1830 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1831 
1832 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1833 	lkb->lkb_wait_count--;
1834 	if (!lkb->lkb_wait_count)
1835 		list_del_init(&lkb->lkb_wait_reply);
1836 	unhold_lkb(lkb);
1837 	return 0;
1838 }
1839 
1840 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1841 {
1842 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1843 	int error;
1844 
1845 	spin_lock_bh(&ls->ls_waiters_lock);
1846 	error = _remove_from_waiters(lkb, mstype, NULL);
1847 	spin_unlock_bh(&ls->ls_waiters_lock);
1848 	return error;
1849 }
1850 
1851 /* Handles situations where we might be processing a "fake" or "local" reply in
1852  * the recovery context which stops any locking activity. Only debugfs might
1853  * change the lockspace waiters but they will held the recovery lock to ensure
1854  * remove_from_waiters_ms() in local case will be the only user manipulating the
1855  * lockspace waiters in recovery context.
1856  */
1857 
1858 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1859 				  const struct dlm_message *ms, bool local)
1860 {
1861 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1862 	int error;
1863 
1864 	if (!local)
1865 		spin_lock_bh(&ls->ls_waiters_lock);
1866 	else
1867 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1868 			     !dlm_locking_stopped(ls));
1869 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1870 	if (!local)
1871 		spin_unlock_bh(&ls->ls_waiters_lock);
1872 	return error;
1873 }
1874 
1875 /* lkb is master or local copy */
1876 
1877 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1878 {
1879 	int b, len = r->res_ls->ls_lvblen;
1880 
1881 	/* b=1 lvb returned to caller
1882 	   b=0 lvb written to rsb or invalidated
1883 	   b=-1 do nothing */
1884 
1885 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1886 
1887 	if (b == 1) {
1888 		if (!lkb->lkb_lvbptr)
1889 			return;
1890 
1891 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1892 			return;
1893 
1894 		if (!r->res_lvbptr)
1895 			return;
1896 
1897 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1898 		lkb->lkb_lvbseq = r->res_lvbseq;
1899 
1900 	} else if (b == 0) {
1901 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1902 			rsb_set_flag(r, RSB_VALNOTVALID);
1903 			return;
1904 		}
1905 
1906 		if (!lkb->lkb_lvbptr)
1907 			return;
1908 
1909 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1910 			return;
1911 
1912 		if (!r->res_lvbptr)
1913 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1914 
1915 		if (!r->res_lvbptr)
1916 			return;
1917 
1918 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1919 		r->res_lvbseq++;
1920 		lkb->lkb_lvbseq = r->res_lvbseq;
1921 		rsb_clear_flag(r, RSB_VALNOTVALID);
1922 	}
1923 
1924 	if (rsb_flag(r, RSB_VALNOTVALID))
1925 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1926 }
1927 
1928 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1929 {
1930 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1931 		return;
1932 
1933 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1934 		rsb_set_flag(r, RSB_VALNOTVALID);
1935 		return;
1936 	}
1937 
1938 	if (!lkb->lkb_lvbptr)
1939 		return;
1940 
1941 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1942 		return;
1943 
1944 	if (!r->res_lvbptr)
1945 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1946 
1947 	if (!r->res_lvbptr)
1948 		return;
1949 
1950 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1951 	r->res_lvbseq++;
1952 	rsb_clear_flag(r, RSB_VALNOTVALID);
1953 }
1954 
1955 /* lkb is process copy (pc) */
1956 
1957 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1958 			    const struct dlm_message *ms)
1959 {
1960 	int b;
1961 
1962 	if (!lkb->lkb_lvbptr)
1963 		return;
1964 
1965 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1966 		return;
1967 
1968 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1969 	if (b == 1) {
1970 		int len = receive_extralen(ms);
1971 		if (len > r->res_ls->ls_lvblen)
1972 			len = r->res_ls->ls_lvblen;
1973 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1974 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1975 	}
1976 }
1977 
1978 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1979    remove_lock -- used for unlock, removes lkb from granted
1980    revert_lock -- used for cancel, moves lkb from convert to granted
1981    grant_lock  -- used for request and convert, adds lkb to granted or
1982                   moves lkb from convert or waiting to granted
1983 
1984    Each of these is used for master or local copy lkb's.  There is
1985    also a _pc() variation used to make the corresponding change on
1986    a process copy (pc) lkb. */
1987 
1988 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1989 {
1990 	del_lkb(r, lkb);
1991 	lkb->lkb_grmode = DLM_LOCK_IV;
1992 	/* this unhold undoes the original ref from create_lkb()
1993 	   so this leads to the lkb being freed */
1994 	unhold_lkb(lkb);
1995 }
1996 
1997 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999 	set_lvb_unlock(r, lkb);
2000 	_remove_lock(r, lkb);
2001 }
2002 
2003 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2004 {
2005 	_remove_lock(r, lkb);
2006 }
2007 
2008 /* returns: 0 did nothing
2009 	    1 moved lock to granted
2010 	   -1 removed lock */
2011 
2012 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014 	int rv = 0;
2015 
2016 	lkb->lkb_rqmode = DLM_LOCK_IV;
2017 
2018 	switch (lkb->lkb_status) {
2019 	case DLM_LKSTS_GRANTED:
2020 		break;
2021 	case DLM_LKSTS_CONVERT:
2022 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2023 		rv = 1;
2024 		break;
2025 	case DLM_LKSTS_WAITING:
2026 		del_lkb(r, lkb);
2027 		lkb->lkb_grmode = DLM_LOCK_IV;
2028 		/* this unhold undoes the original ref from create_lkb()
2029 		   so this leads to the lkb being freed */
2030 		unhold_lkb(lkb);
2031 		rv = -1;
2032 		break;
2033 	default:
2034 		log_print("invalid status for revert %d", lkb->lkb_status);
2035 	}
2036 	return rv;
2037 }
2038 
2039 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040 {
2041 	return revert_lock(r, lkb);
2042 }
2043 
2044 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2045 {
2046 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2047 		lkb->lkb_grmode = lkb->lkb_rqmode;
2048 		if (lkb->lkb_status)
2049 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2050 		else
2051 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2052 	}
2053 
2054 	lkb->lkb_rqmode = DLM_LOCK_IV;
2055 	lkb->lkb_highbast = 0;
2056 }
2057 
2058 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060 	set_lvb_lock(r, lkb);
2061 	_grant_lock(r, lkb);
2062 }
2063 
2064 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2065 			  const struct dlm_message *ms)
2066 {
2067 	set_lvb_lock_pc(r, lkb, ms);
2068 	_grant_lock(r, lkb);
2069 }
2070 
2071 /* called by grant_pending_locks() which means an async grant message must
2072    be sent to the requesting node in addition to granting the lock if the
2073    lkb belongs to a remote node. */
2074 
2075 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077 	grant_lock(r, lkb);
2078 	if (is_master_copy(lkb))
2079 		send_grant(r, lkb);
2080 	else
2081 		queue_cast(r, lkb, 0);
2082 }
2083 
2084 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2085    change the granted/requested modes.  We're munging things accordingly in
2086    the process copy.
2087    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2088    conversion deadlock
2089    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2090    compatible with other granted locks */
2091 
2092 static void munge_demoted(struct dlm_lkb *lkb)
2093 {
2094 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2095 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2096 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2097 		return;
2098 	}
2099 
2100 	lkb->lkb_grmode = DLM_LOCK_NL;
2101 }
2102 
2103 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2104 {
2105 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2106 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2107 		log_print("munge_altmode %x invalid reply type %d",
2108 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2109 		return;
2110 	}
2111 
2112 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2113 		lkb->lkb_rqmode = DLM_LOCK_PR;
2114 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2115 		lkb->lkb_rqmode = DLM_LOCK_CW;
2116 	else {
2117 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2118 		dlm_print_lkb(lkb);
2119 	}
2120 }
2121 
2122 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2123 {
2124 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2125 					   lkb_statequeue);
2126 	if (lkb->lkb_id == first->lkb_id)
2127 		return 1;
2128 
2129 	return 0;
2130 }
2131 
2132 /* Check if the given lkb conflicts with another lkb on the queue. */
2133 
2134 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2135 {
2136 	struct dlm_lkb *this;
2137 
2138 	list_for_each_entry(this, head, lkb_statequeue) {
2139 		if (this == lkb)
2140 			continue;
2141 		if (!modes_compat(this, lkb))
2142 			return 1;
2143 	}
2144 	return 0;
2145 }
2146 
2147 /*
2148  * "A conversion deadlock arises with a pair of lock requests in the converting
2149  * queue for one resource.  The granted mode of each lock blocks the requested
2150  * mode of the other lock."
2151  *
2152  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2153  * convert queue from being granted, then deadlk/demote lkb.
2154  *
2155  * Example:
2156  * Granted Queue: empty
2157  * Convert Queue: NL->EX (first lock)
2158  *                PR->EX (second lock)
2159  *
2160  * The first lock can't be granted because of the granted mode of the second
2161  * lock and the second lock can't be granted because it's not first in the
2162  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2163  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2164  * flag set and return DEMOTED in the lksb flags.
2165  *
2166  * Originally, this function detected conv-deadlk in a more limited scope:
2167  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2168  * - if lkb1 was the first entry in the queue (not just earlier), and was
2169  *   blocked by the granted mode of lkb2, and there was nothing on the
2170  *   granted queue preventing lkb1 from being granted immediately, i.e.
2171  *   lkb2 was the only thing preventing lkb1 from being granted.
2172  *
2173  * That second condition meant we'd only say there was conv-deadlk if
2174  * resolving it (by demotion) would lead to the first lock on the convert
2175  * queue being granted right away.  It allowed conversion deadlocks to exist
2176  * between locks on the convert queue while they couldn't be granted anyway.
2177  *
2178  * Now, we detect and take action on conversion deadlocks immediately when
2179  * they're created, even if they may not be immediately consequential.  If
2180  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2181  * mode that would prevent lkb1's conversion from being granted, we do a
2182  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2183  * I think this means that the lkb_is_ahead condition below should always
2184  * be zero, i.e. there will never be conv-deadlk between two locks that are
2185  * both already on the convert queue.
2186  */
2187 
2188 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2189 {
2190 	struct dlm_lkb *lkb1;
2191 	int lkb_is_ahead = 0;
2192 
2193 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2194 		if (lkb1 == lkb2) {
2195 			lkb_is_ahead = 1;
2196 			continue;
2197 		}
2198 
2199 		if (!lkb_is_ahead) {
2200 			if (!modes_compat(lkb2, lkb1))
2201 				return 1;
2202 		} else {
2203 			if (!modes_compat(lkb2, lkb1) &&
2204 			    !modes_compat(lkb1, lkb2))
2205 				return 1;
2206 		}
2207 	}
2208 	return 0;
2209 }
2210 
2211 /*
2212  * Return 1 if the lock can be granted, 0 otherwise.
2213  * Also detect and resolve conversion deadlocks.
2214  *
2215  * lkb is the lock to be granted
2216  *
2217  * now is 1 if the function is being called in the context of the
2218  * immediate request, it is 0 if called later, after the lock has been
2219  * queued.
2220  *
2221  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2222  * after recovery.
2223  *
2224  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2225  */
2226 
2227 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2228 			   int recover)
2229 {
2230 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2231 
2232 	/*
2233 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2234 	 * a new request for a NL mode lock being blocked.
2235 	 *
2236 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2237 	 * request, then it would be granted.  In essence, the use of this flag
2238 	 * tells the Lock Manager to expedite theis request by not considering
2239 	 * what may be in the CONVERTING or WAITING queues...  As of this
2240 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2241 	 * mode locks.  This flag is not valid for conversion requests.
2242 	 *
2243 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2244 	 * conversion or used with a non-NL requested mode.  We also know an
2245 	 * EXPEDITE request is always granted immediately, so now must always
2246 	 * be 1.  The full condition to grant an expedite request: (now &&
2247 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2248 	 * therefore be shortened to just checking the flag.
2249 	 */
2250 
2251 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2252 		return 1;
2253 
2254 	/*
2255 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2256 	 * added to the remaining conditions.
2257 	 */
2258 
2259 	if (queue_conflict(&r->res_grantqueue, lkb))
2260 		return 0;
2261 
2262 	/*
2263 	 * 6-3: By default, a conversion request is immediately granted if the
2264 	 * requested mode is compatible with the modes of all other granted
2265 	 * locks
2266 	 */
2267 
2268 	if (queue_conflict(&r->res_convertqueue, lkb))
2269 		return 0;
2270 
2271 	/*
2272 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2273 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2274 	 * The lkb's may have been rebuilt on the queues in a different
2275 	 * order than they were in on the previous master.  So, granting
2276 	 * queued conversions in order after recovery doesn't make sense
2277 	 * since the order hasn't been preserved anyway.  The new order
2278 	 * could also have created a new "in place" conversion deadlock.
2279 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2280 	 * After recovery, there would be no granted locks, and possibly
2281 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2282 	 * recovery, grant conversions without considering order.
2283 	 */
2284 
2285 	if (conv && recover)
2286 		return 1;
2287 
2288 	/*
2289 	 * 6-5: But the default algorithm for deciding whether to grant or
2290 	 * queue conversion requests does not by itself guarantee that such
2291 	 * requests are serviced on a "first come first serve" basis.  This, in
2292 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2293 	 *
2294 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2295 	 * the system service employed to request a lock conversion.  This flag
2296 	 * forces certain conversion requests to be queued, even if they are
2297 	 * compatible with the granted modes of other locks on the same
2298 	 * resource.  Thus, the use of this flag results in conversion requests
2299 	 * being ordered on a "first come first servce" basis.
2300 	 *
2301 	 * DCT: This condition is all about new conversions being able to occur
2302 	 * "in place" while the lock remains on the granted queue (assuming
2303 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2304 	 * doesn't _have_ to go onto the convert queue where it's processed in
2305 	 * order.  The "now" variable is necessary to distinguish converts
2306 	 * being received and processed for the first time now, because once a
2307 	 * convert is moved to the conversion queue the condition below applies
2308 	 * requiring fifo granting.
2309 	 */
2310 
2311 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2312 		return 1;
2313 
2314 	/*
2315 	 * Even if the convert is compat with all granted locks,
2316 	 * QUECVT forces it behind other locks on the convert queue.
2317 	 */
2318 
2319 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2320 		if (list_empty(&r->res_convertqueue))
2321 			return 1;
2322 		else
2323 			return 0;
2324 	}
2325 
2326 	/*
2327 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2328 	 * order.
2329 	 */
2330 
2331 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2332 		return 1;
2333 
2334 	/*
2335 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2336 	 * granted until all other conversion requests ahead of it are granted
2337 	 * and/or canceled.
2338 	 */
2339 
2340 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2341 		return 1;
2342 
2343 	/*
2344 	 * 6-4: By default, a new request is immediately granted only if all
2345 	 * three of the following conditions are satisfied when the request is
2346 	 * issued:
2347 	 * - The queue of ungranted conversion requests for the resource is
2348 	 *   empty.
2349 	 * - The queue of ungranted new requests for the resource is empty.
2350 	 * - The mode of the new request is compatible with the most
2351 	 *   restrictive mode of all granted locks on the resource.
2352 	 */
2353 
2354 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2355 	    list_empty(&r->res_waitqueue))
2356 		return 1;
2357 
2358 	/*
2359 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2360 	 * it cannot be granted until the queue of ungranted conversion
2361 	 * requests is empty, all ungranted new requests ahead of it are
2362 	 * granted and/or canceled, and it is compatible with the granted mode
2363 	 * of the most restrictive lock granted on the resource.
2364 	 */
2365 
2366 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2367 	    first_in_list(lkb, &r->res_waitqueue))
2368 		return 1;
2369 
2370 	return 0;
2371 }
2372 
2373 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2374 			  int recover, int *err)
2375 {
2376 	int rv;
2377 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2378 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2379 
2380 	if (err)
2381 		*err = 0;
2382 
2383 	rv = _can_be_granted(r, lkb, now, recover);
2384 	if (rv)
2385 		goto out;
2386 
2387 	/*
2388 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2389 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2390 	 * cancels one of the locks.
2391 	 */
2392 
2393 	if (is_convert && can_be_queued(lkb) &&
2394 	    conversion_deadlock_detect(r, lkb)) {
2395 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2396 			lkb->lkb_grmode = DLM_LOCK_NL;
2397 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2398 		} else if (err) {
2399 			*err = -EDEADLK;
2400 		} else {
2401 			log_print("can_be_granted deadlock %x now %d",
2402 				  lkb->lkb_id, now);
2403 			dlm_dump_rsb(r);
2404 		}
2405 		goto out;
2406 	}
2407 
2408 	/*
2409 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2410 	 * to grant a request in a mode other than the normal rqmode.  It's a
2411 	 * simple way to provide a big optimization to applications that can
2412 	 * use them.
2413 	 */
2414 
2415 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2416 		alt = DLM_LOCK_PR;
2417 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2418 		alt = DLM_LOCK_CW;
2419 
2420 	if (alt) {
2421 		lkb->lkb_rqmode = alt;
2422 		rv = _can_be_granted(r, lkb, now, 0);
2423 		if (rv)
2424 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2425 		else
2426 			lkb->lkb_rqmode = rqmode;
2427 	}
2428  out:
2429 	return rv;
2430 }
2431 
2432 /* Returns the highest requested mode of all blocked conversions; sets
2433    cw if there's a blocked conversion to DLM_LOCK_CW. */
2434 
2435 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2436 				 unsigned int *count)
2437 {
2438 	struct dlm_lkb *lkb, *s;
2439 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2440 	int hi, demoted, quit, grant_restart, demote_restart;
2441 	int deadlk;
2442 
2443 	quit = 0;
2444  restart:
2445 	grant_restart = 0;
2446 	demote_restart = 0;
2447 	hi = DLM_LOCK_IV;
2448 
2449 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2450 		demoted = is_demoted(lkb);
2451 		deadlk = 0;
2452 
2453 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2454 			grant_lock_pending(r, lkb);
2455 			grant_restart = 1;
2456 			if (count)
2457 				(*count)++;
2458 			continue;
2459 		}
2460 
2461 		if (!demoted && is_demoted(lkb)) {
2462 			log_print("WARN: pending demoted %x node %d %s",
2463 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2464 			demote_restart = 1;
2465 			continue;
2466 		}
2467 
2468 		if (deadlk) {
2469 			/*
2470 			 * If DLM_LKB_NODLKWT flag is set and conversion
2471 			 * deadlock is detected, we request blocking AST and
2472 			 * down (or cancel) conversion.
2473 			 */
2474 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2475 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2476 					queue_bast(r, lkb, lkb->lkb_rqmode);
2477 					lkb->lkb_highbast = lkb->lkb_rqmode;
2478 				}
2479 			} else {
2480 				log_print("WARN: pending deadlock %x node %d %s",
2481 					  lkb->lkb_id, lkb->lkb_nodeid,
2482 					  r->res_name);
2483 				dlm_dump_rsb(r);
2484 			}
2485 			continue;
2486 		}
2487 
2488 		hi = max_t(int, lkb->lkb_rqmode, hi);
2489 
2490 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2491 			*cw = 1;
2492 	}
2493 
2494 	if (grant_restart)
2495 		goto restart;
2496 	if (demote_restart && !quit) {
2497 		quit = 1;
2498 		goto restart;
2499 	}
2500 
2501 	return max_t(int, high, hi);
2502 }
2503 
2504 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2505 			      unsigned int *count)
2506 {
2507 	struct dlm_lkb *lkb, *s;
2508 
2509 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2510 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2511 			grant_lock_pending(r, lkb);
2512 			if (count)
2513 				(*count)++;
2514 		} else {
2515 			high = max_t(int, lkb->lkb_rqmode, high);
2516 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2517 				*cw = 1;
2518 		}
2519 	}
2520 
2521 	return high;
2522 }
2523 
2524 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2525    on either the convert or waiting queue.
2526    high is the largest rqmode of all locks blocked on the convert or
2527    waiting queue. */
2528 
2529 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2530 {
2531 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2532 		if (gr->lkb_highbast < DLM_LOCK_EX)
2533 			return 1;
2534 		return 0;
2535 	}
2536 
2537 	if (gr->lkb_highbast < high &&
2538 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2539 		return 1;
2540 	return 0;
2541 }
2542 
2543 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2544 {
2545 	struct dlm_lkb *lkb, *s;
2546 	int high = DLM_LOCK_IV;
2547 	int cw = 0;
2548 
2549 	if (!is_master(r)) {
2550 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2551 		dlm_dump_rsb(r);
2552 		return;
2553 	}
2554 
2555 	high = grant_pending_convert(r, high, &cw, count);
2556 	high = grant_pending_wait(r, high, &cw, count);
2557 
2558 	if (high == DLM_LOCK_IV)
2559 		return;
2560 
2561 	/*
2562 	 * If there are locks left on the wait/convert queue then send blocking
2563 	 * ASTs to granted locks based on the largest requested mode (high)
2564 	 * found above.
2565 	 */
2566 
2567 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2568 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2569 			if (cw && high == DLM_LOCK_PR &&
2570 			    lkb->lkb_grmode == DLM_LOCK_PR)
2571 				queue_bast(r, lkb, DLM_LOCK_CW);
2572 			else
2573 				queue_bast(r, lkb, high);
2574 			lkb->lkb_highbast = high;
2575 		}
2576 	}
2577 }
2578 
2579 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2580 {
2581 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2582 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2583 		if (gr->lkb_highbast < DLM_LOCK_EX)
2584 			return 1;
2585 		return 0;
2586 	}
2587 
2588 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2589 		return 1;
2590 	return 0;
2591 }
2592 
2593 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2594 			    struct dlm_lkb *lkb)
2595 {
2596 	struct dlm_lkb *gr;
2597 
2598 	list_for_each_entry(gr, head, lkb_statequeue) {
2599 		/* skip self when sending basts to convertqueue */
2600 		if (gr == lkb)
2601 			continue;
2602 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2603 			queue_bast(r, gr, lkb->lkb_rqmode);
2604 			gr->lkb_highbast = lkb->lkb_rqmode;
2605 		}
2606 	}
2607 }
2608 
2609 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2610 {
2611 	send_bast_queue(r, &r->res_grantqueue, lkb);
2612 }
2613 
2614 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2615 {
2616 	send_bast_queue(r, &r->res_grantqueue, lkb);
2617 	send_bast_queue(r, &r->res_convertqueue, lkb);
2618 }
2619 
2620 /* set_master(r, lkb) -- set the master nodeid of a resource
2621 
2622    The purpose of this function is to set the nodeid field in the given
2623    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2624    known, it can just be copied to the lkb and the function will return
2625    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2626    before it can be copied to the lkb.
2627 
2628    When the rsb nodeid is being looked up remotely, the initial lkb
2629    causing the lookup is kept on the ls_waiters list waiting for the
2630    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2631    on the rsb's res_lookup list until the master is verified.
2632 
2633    Return values:
2634    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2635    1: the rsb master is not available and the lkb has been placed on
2636       a wait queue
2637 */
2638 
2639 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641 	int our_nodeid = dlm_our_nodeid();
2642 
2643 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2644 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2645 		r->res_first_lkid = lkb->lkb_id;
2646 		lkb->lkb_nodeid = r->res_nodeid;
2647 		return 0;
2648 	}
2649 
2650 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2651 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2652 		return 1;
2653 	}
2654 
2655 	if (r->res_master_nodeid == our_nodeid) {
2656 		lkb->lkb_nodeid = 0;
2657 		return 0;
2658 	}
2659 
2660 	if (r->res_master_nodeid) {
2661 		lkb->lkb_nodeid = r->res_master_nodeid;
2662 		return 0;
2663 	}
2664 
2665 	if (dlm_dir_nodeid(r) == our_nodeid) {
2666 		/* This is a somewhat unusual case; find_rsb will usually
2667 		   have set res_master_nodeid when dir nodeid is local, but
2668 		   there are cases where we become the dir node after we've
2669 		   past find_rsb and go through _request_lock again.
2670 		   confirm_master() or process_lookup_list() needs to be
2671 		   called after this. */
2672 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2673 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2674 			  r->res_name);
2675 		r->res_master_nodeid = our_nodeid;
2676 		r->res_nodeid = 0;
2677 		lkb->lkb_nodeid = 0;
2678 		return 0;
2679 	}
2680 
2681 	r->res_first_lkid = lkb->lkb_id;
2682 	send_lookup(r, lkb);
2683 	return 1;
2684 }
2685 
2686 static void process_lookup_list(struct dlm_rsb *r)
2687 {
2688 	struct dlm_lkb *lkb, *safe;
2689 
2690 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2691 		list_del_init(&lkb->lkb_rsb_lookup);
2692 		_request_lock(r, lkb);
2693 	}
2694 }
2695 
2696 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2697 
2698 static void confirm_master(struct dlm_rsb *r, int error)
2699 {
2700 	struct dlm_lkb *lkb;
2701 
2702 	if (!r->res_first_lkid)
2703 		return;
2704 
2705 	switch (error) {
2706 	case 0:
2707 	case -EINPROGRESS:
2708 		r->res_first_lkid = 0;
2709 		process_lookup_list(r);
2710 		break;
2711 
2712 	case -EAGAIN:
2713 	case -EBADR:
2714 	case -ENOTBLK:
2715 		/* the remote request failed and won't be retried (it was
2716 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2717 		   lkb the first_lkid */
2718 
2719 		r->res_first_lkid = 0;
2720 
2721 		if (!list_empty(&r->res_lookup)) {
2722 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2723 					 lkb_rsb_lookup);
2724 			list_del_init(&lkb->lkb_rsb_lookup);
2725 			r->res_first_lkid = lkb->lkb_id;
2726 			_request_lock(r, lkb);
2727 		}
2728 		break;
2729 
2730 	default:
2731 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2732 	}
2733 }
2734 
2735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2736 			 int namelen, void (*ast)(void *astparam),
2737 			 void *astparam,
2738 			 void (*bast)(void *astparam, int mode),
2739 			 struct dlm_args *args)
2740 {
2741 	int rv = -EINVAL;
2742 
2743 	/* check for invalid arg usage */
2744 
2745 	if (mode < 0 || mode > DLM_LOCK_EX)
2746 		goto out;
2747 
2748 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2749 		goto out;
2750 
2751 	if (flags & DLM_LKF_CANCEL)
2752 		goto out;
2753 
2754 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2755 		goto out;
2756 
2757 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2758 		goto out;
2759 
2760 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2761 		goto out;
2762 
2763 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2764 		goto out;
2765 
2766 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2767 		goto out;
2768 
2769 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2770 		goto out;
2771 
2772 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2773 		goto out;
2774 
2775 	if (!ast || !lksb)
2776 		goto out;
2777 
2778 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2779 		goto out;
2780 
2781 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2782 		goto out;
2783 
2784 	/* these args will be copied to the lkb in validate_lock_args,
2785 	   it cannot be done now because when converting locks, fields in
2786 	   an active lkb cannot be modified before locking the rsb */
2787 
2788 	args->flags = flags;
2789 	args->astfn = ast;
2790 	args->astparam = astparam;
2791 	args->bastfn = bast;
2792 	args->mode = mode;
2793 	args->lksb = lksb;
2794 	rv = 0;
2795  out:
2796 	return rv;
2797 }
2798 
2799 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2800 {
2801 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2802  		      DLM_LKF_FORCEUNLOCK))
2803 		return -EINVAL;
2804 
2805 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2806 		return -EINVAL;
2807 
2808 	args->flags = flags;
2809 	args->astparam = astarg;
2810 	return 0;
2811 }
2812 
2813 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2814 			      struct dlm_args *args)
2815 {
2816 	int rv = -EBUSY;
2817 
2818 	if (args->flags & DLM_LKF_CONVERT) {
2819 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2820 			goto out;
2821 
2822 		/* lock not allowed if there's any op in progress */
2823 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2824 			goto out;
2825 
2826 		if (is_overlap(lkb))
2827 			goto out;
2828 
2829 		rv = -EINVAL;
2830 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2831 			goto out;
2832 
2833 		if (args->flags & DLM_LKF_QUECVT &&
2834 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2835 			goto out;
2836 	}
2837 
2838 	lkb->lkb_exflags = args->flags;
2839 	dlm_set_sbflags_val(lkb, 0);
2840 	lkb->lkb_astfn = args->astfn;
2841 	lkb->lkb_astparam = args->astparam;
2842 	lkb->lkb_bastfn = args->bastfn;
2843 	lkb->lkb_rqmode = args->mode;
2844 	lkb->lkb_lksb = args->lksb;
2845 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2846 	lkb->lkb_ownpid = (int) current->pid;
2847 	rv = 0;
2848  out:
2849 	switch (rv) {
2850 	case 0:
2851 		break;
2852 	case -EINVAL:
2853 		/* annoy the user because dlm usage is wrong */
2854 		WARN_ON(1);
2855 		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2856 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2857 			  lkb->lkb_status, lkb->lkb_wait_type);
2858 		break;
2859 	default:
2860 		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2861 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2862 			  lkb->lkb_status, lkb->lkb_wait_type);
2863 		break;
2864 	}
2865 
2866 	return rv;
2867 }
2868 
2869 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2870    for success */
2871 
2872 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2873    because there may be a lookup in progress and it's valid to do
2874    cancel/unlockf on it */
2875 
2876 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2877 {
2878 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2879 	int rv = -EBUSY;
2880 
2881 	/* normal unlock not allowed if there's any op in progress */
2882 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2883 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2884 		goto out;
2885 
2886 	/* an lkb may be waiting for an rsb lookup to complete where the
2887 	   lookup was initiated by another lock */
2888 
2889 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2890 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2891 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2892 			list_del_init(&lkb->lkb_rsb_lookup);
2893 			queue_cast(lkb->lkb_resource, lkb,
2894 				   args->flags & DLM_LKF_CANCEL ?
2895 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2896 			unhold_lkb(lkb); /* undoes create_lkb() */
2897 		}
2898 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2899 		goto out;
2900 	}
2901 
2902 	rv = -EINVAL;
2903 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2904 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2905 		dlm_print_lkb(lkb);
2906 		goto out;
2907 	}
2908 
2909 	/* an lkb may still exist even though the lock is EOL'ed due to a
2910 	 * cancel, unlock or failed noqueue request; an app can't use these
2911 	 * locks; return same error as if the lkid had not been found at all
2912 	 */
2913 
2914 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2915 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2916 		rv = -ENOENT;
2917 		goto out;
2918 	}
2919 
2920 	if (is_overlap_unlock(lkb))
2921 		goto out;
2922 
2923 	/* cancel not allowed with another cancel/unlock in progress */
2924 
2925 	if (args->flags & DLM_LKF_CANCEL) {
2926 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2927 			goto out;
2928 
2929 		if (is_overlap_cancel(lkb))
2930 			goto out;
2931 
2932 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2933 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2934 			rv = -EBUSY;
2935 			goto out;
2936 		}
2937 
2938 		/* there's nothing to cancel */
2939 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2940 		    !lkb->lkb_wait_type) {
2941 			rv = -EBUSY;
2942 			goto out;
2943 		}
2944 
2945 		switch (lkb->lkb_wait_type) {
2946 		case DLM_MSG_LOOKUP:
2947 		case DLM_MSG_REQUEST:
2948 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2949 			rv = -EBUSY;
2950 			goto out;
2951 		case DLM_MSG_UNLOCK:
2952 		case DLM_MSG_CANCEL:
2953 			goto out;
2954 		}
2955 		/* add_to_waiters() will set OVERLAP_CANCEL */
2956 		goto out_ok;
2957 	}
2958 
2959 	/* do we need to allow a force-unlock if there's a normal unlock
2960 	   already in progress?  in what conditions could the normal unlock
2961 	   fail such that we'd want to send a force-unlock to be sure? */
2962 
2963 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2964 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2965 			goto out;
2966 
2967 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2968 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2969 			rv = -EBUSY;
2970 			goto out;
2971 		}
2972 
2973 		switch (lkb->lkb_wait_type) {
2974 		case DLM_MSG_LOOKUP:
2975 		case DLM_MSG_REQUEST:
2976 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2977 			rv = -EBUSY;
2978 			goto out;
2979 		case DLM_MSG_UNLOCK:
2980 			goto out;
2981 		}
2982 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2983 	}
2984 
2985  out_ok:
2986 	/* an overlapping op shouldn't blow away exflags from other op */
2987 	lkb->lkb_exflags |= args->flags;
2988 	dlm_set_sbflags_val(lkb, 0);
2989 	lkb->lkb_astparam = args->astparam;
2990 	rv = 0;
2991  out:
2992 	switch (rv) {
2993 	case 0:
2994 		break;
2995 	case -EINVAL:
2996 		/* annoy the user because dlm usage is wrong */
2997 		WARN_ON(1);
2998 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2999 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3000 			  args->flags, lkb->lkb_wait_type,
3001 			  lkb->lkb_resource->res_name);
3002 		break;
3003 	default:
3004 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3005 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3006 			  args->flags, lkb->lkb_wait_type,
3007 			  lkb->lkb_resource->res_name);
3008 		break;
3009 	}
3010 
3011 	return rv;
3012 }
3013 
3014 /*
3015  * Four stage 4 varieties:
3016  * do_request(), do_convert(), do_unlock(), do_cancel()
3017  * These are called on the master node for the given lock and
3018  * from the central locking logic.
3019  */
3020 
3021 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 {
3023 	int error = 0;
3024 
3025 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3026 		grant_lock(r, lkb);
3027 		queue_cast(r, lkb, 0);
3028 		goto out;
3029 	}
3030 
3031 	if (can_be_queued(lkb)) {
3032 		error = -EINPROGRESS;
3033 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3034 		goto out;
3035 	}
3036 
3037 	error = -EAGAIN;
3038 	queue_cast(r, lkb, -EAGAIN);
3039  out:
3040 	return error;
3041 }
3042 
3043 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3044 			       int error)
3045 {
3046 	switch (error) {
3047 	case -EAGAIN:
3048 		if (force_blocking_asts(lkb))
3049 			send_blocking_asts_all(r, lkb);
3050 		break;
3051 	case -EINPROGRESS:
3052 		send_blocking_asts(r, lkb);
3053 		break;
3054 	}
3055 }
3056 
3057 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3058 {
3059 	int error = 0;
3060 	int deadlk = 0;
3061 
3062 	/* changing an existing lock may allow others to be granted */
3063 
3064 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3065 		grant_lock(r, lkb);
3066 		queue_cast(r, lkb, 0);
3067 		goto out;
3068 	}
3069 
3070 	/* can_be_granted() detected that this lock would block in a conversion
3071 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3072 	   the ast for the convert. */
3073 
3074 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3075 		/* it's left on the granted queue */
3076 		revert_lock(r, lkb);
3077 		queue_cast(r, lkb, -EDEADLK);
3078 		error = -EDEADLK;
3079 		goto out;
3080 	}
3081 
3082 	/* is_demoted() means the can_be_granted() above set the grmode
3083 	   to NL, and left us on the granted queue.  This auto-demotion
3084 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3085 	   now grantable.  We have to try to grant other converting locks
3086 	   before we try again to grant this one. */
3087 
3088 	if (is_demoted(lkb)) {
3089 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3090 		if (_can_be_granted(r, lkb, 1, 0)) {
3091 			grant_lock(r, lkb);
3092 			queue_cast(r, lkb, 0);
3093 			goto out;
3094 		}
3095 		/* else fall through and move to convert queue */
3096 	}
3097 
3098 	if (can_be_queued(lkb)) {
3099 		error = -EINPROGRESS;
3100 		del_lkb(r, lkb);
3101 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3102 		goto out;
3103 	}
3104 
3105 	error = -EAGAIN;
3106 	queue_cast(r, lkb, -EAGAIN);
3107  out:
3108 	return error;
3109 }
3110 
3111 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3112 			       int error)
3113 {
3114 	switch (error) {
3115 	case 0:
3116 		grant_pending_locks(r, NULL);
3117 		/* grant_pending_locks also sends basts */
3118 		break;
3119 	case -EAGAIN:
3120 		if (force_blocking_asts(lkb))
3121 			send_blocking_asts_all(r, lkb);
3122 		break;
3123 	case -EINPROGRESS:
3124 		send_blocking_asts(r, lkb);
3125 		break;
3126 	}
3127 }
3128 
3129 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3130 {
3131 	remove_lock(r, lkb);
3132 	queue_cast(r, lkb, -DLM_EUNLOCK);
3133 	return -DLM_EUNLOCK;
3134 }
3135 
3136 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3137 			      int error)
3138 {
3139 	grant_pending_locks(r, NULL);
3140 }
3141 
3142 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3143 
3144 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3145 {
3146 	int error;
3147 
3148 	error = revert_lock(r, lkb);
3149 	if (error) {
3150 		queue_cast(r, lkb, -DLM_ECANCEL);
3151 		return -DLM_ECANCEL;
3152 	}
3153 	return 0;
3154 }
3155 
3156 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3157 			      int error)
3158 {
3159 	if (error)
3160 		grant_pending_locks(r, NULL);
3161 }
3162 
3163 /*
3164  * Four stage 3 varieties:
3165  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3166  */
3167 
3168 /* add a new lkb to a possibly new rsb, called by requesting process */
3169 
3170 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3171 {
3172 	int error;
3173 
3174 	/* set_master: sets lkb nodeid from r */
3175 
3176 	error = set_master(r, lkb);
3177 	if (error < 0)
3178 		goto out;
3179 	if (error) {
3180 		error = 0;
3181 		goto out;
3182 	}
3183 
3184 	if (is_remote(r)) {
3185 		/* receive_request() calls do_request() on remote node */
3186 		error = send_request(r, lkb);
3187 	} else {
3188 		error = do_request(r, lkb);
3189 		/* for remote locks the request_reply is sent
3190 		   between do_request and do_request_effects */
3191 		do_request_effects(r, lkb, error);
3192 	}
3193  out:
3194 	return error;
3195 }
3196 
3197 /* change some property of an existing lkb, e.g. mode */
3198 
3199 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3200 {
3201 	int error;
3202 
3203 	if (is_remote(r)) {
3204 		/* receive_convert() calls do_convert() on remote node */
3205 		error = send_convert(r, lkb);
3206 	} else {
3207 		error = do_convert(r, lkb);
3208 		/* for remote locks the convert_reply is sent
3209 		   between do_convert and do_convert_effects */
3210 		do_convert_effects(r, lkb, error);
3211 	}
3212 
3213 	return error;
3214 }
3215 
3216 /* remove an existing lkb from the granted queue */
3217 
3218 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3219 {
3220 	int error;
3221 
3222 	if (is_remote(r)) {
3223 		/* receive_unlock() calls do_unlock() on remote node */
3224 		error = send_unlock(r, lkb);
3225 	} else {
3226 		error = do_unlock(r, lkb);
3227 		/* for remote locks the unlock_reply is sent
3228 		   between do_unlock and do_unlock_effects */
3229 		do_unlock_effects(r, lkb, error);
3230 	}
3231 
3232 	return error;
3233 }
3234 
3235 /* remove an existing lkb from the convert or wait queue */
3236 
3237 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3238 {
3239 	int error;
3240 
3241 	if (is_remote(r)) {
3242 		/* receive_cancel() calls do_cancel() on remote node */
3243 		error = send_cancel(r, lkb);
3244 	} else {
3245 		error = do_cancel(r, lkb);
3246 		/* for remote locks the cancel_reply is sent
3247 		   between do_cancel and do_cancel_effects */
3248 		do_cancel_effects(r, lkb, error);
3249 	}
3250 
3251 	return error;
3252 }
3253 
3254 /*
3255  * Four stage 2 varieties:
3256  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3257  */
3258 
3259 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3260 			const void *name, int len,
3261 			struct dlm_args *args)
3262 {
3263 	struct dlm_rsb *r;
3264 	int error;
3265 
3266 	error = validate_lock_args(ls, lkb, args);
3267 	if (error)
3268 		return error;
3269 
3270 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3271 	if (error)
3272 		return error;
3273 
3274 	lock_rsb(r);
3275 
3276 	attach_lkb(r, lkb);
3277 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3278 
3279 	error = _request_lock(r, lkb);
3280 
3281 	unlock_rsb(r);
3282 	put_rsb(r);
3283 	return error;
3284 }
3285 
3286 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3287 			struct dlm_args *args)
3288 {
3289 	struct dlm_rsb *r;
3290 	int error;
3291 
3292 	r = lkb->lkb_resource;
3293 
3294 	hold_rsb(r);
3295 	lock_rsb(r);
3296 
3297 	error = validate_lock_args(ls, lkb, args);
3298 	if (error)
3299 		goto out;
3300 
3301 	error = _convert_lock(r, lkb);
3302  out:
3303 	unlock_rsb(r);
3304 	put_rsb(r);
3305 	return error;
3306 }
3307 
3308 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3309 		       struct dlm_args *args)
3310 {
3311 	struct dlm_rsb *r;
3312 	int error;
3313 
3314 	r = lkb->lkb_resource;
3315 
3316 	hold_rsb(r);
3317 	lock_rsb(r);
3318 
3319 	error = validate_unlock_args(lkb, args);
3320 	if (error)
3321 		goto out;
3322 
3323 	error = _unlock_lock(r, lkb);
3324  out:
3325 	unlock_rsb(r);
3326 	put_rsb(r);
3327 	return error;
3328 }
3329 
3330 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3331 		       struct dlm_args *args)
3332 {
3333 	struct dlm_rsb *r;
3334 	int error;
3335 
3336 	r = lkb->lkb_resource;
3337 
3338 	hold_rsb(r);
3339 	lock_rsb(r);
3340 
3341 	error = validate_unlock_args(lkb, args);
3342 	if (error)
3343 		goto out;
3344 
3345 	error = _cancel_lock(r, lkb);
3346  out:
3347 	unlock_rsb(r);
3348 	put_rsb(r);
3349 	return error;
3350 }
3351 
3352 /*
3353  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3354  */
3355 
3356 int dlm_lock(dlm_lockspace_t *lockspace,
3357 	     int mode,
3358 	     struct dlm_lksb *lksb,
3359 	     uint32_t flags,
3360 	     const void *name,
3361 	     unsigned int namelen,
3362 	     uint32_t parent_lkid,
3363 	     void (*ast) (void *astarg),
3364 	     void *astarg,
3365 	     void (*bast) (void *astarg, int mode))
3366 {
3367 	struct dlm_ls *ls;
3368 	struct dlm_lkb *lkb;
3369 	struct dlm_args args;
3370 	int error, convert = flags & DLM_LKF_CONVERT;
3371 
3372 	ls = dlm_find_lockspace_local(lockspace);
3373 	if (!ls)
3374 		return -EINVAL;
3375 
3376 	dlm_lock_recovery(ls);
3377 
3378 	if (convert)
3379 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3380 	else
3381 		error = create_lkb(ls, &lkb);
3382 
3383 	if (error)
3384 		goto out;
3385 
3386 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3387 
3388 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3389 			      &args);
3390 	if (error)
3391 		goto out_put;
3392 
3393 	if (convert)
3394 		error = convert_lock(ls, lkb, &args);
3395 	else
3396 		error = request_lock(ls, lkb, name, namelen, &args);
3397 
3398 	if (error == -EINPROGRESS)
3399 		error = 0;
3400  out_put:
3401 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3402 
3403 	if (convert || error)
3404 		__put_lkb(ls, lkb);
3405 	if (error == -EAGAIN || error == -EDEADLK)
3406 		error = 0;
3407  out:
3408 	dlm_unlock_recovery(ls);
3409 	dlm_put_lockspace(ls);
3410 	return error;
3411 }
3412 
3413 int dlm_unlock(dlm_lockspace_t *lockspace,
3414 	       uint32_t lkid,
3415 	       uint32_t flags,
3416 	       struct dlm_lksb *lksb,
3417 	       void *astarg)
3418 {
3419 	struct dlm_ls *ls;
3420 	struct dlm_lkb *lkb;
3421 	struct dlm_args args;
3422 	int error;
3423 
3424 	ls = dlm_find_lockspace_local(lockspace);
3425 	if (!ls)
3426 		return -EINVAL;
3427 
3428 	dlm_lock_recovery(ls);
3429 
3430 	error = find_lkb(ls, lkid, &lkb);
3431 	if (error)
3432 		goto out;
3433 
3434 	trace_dlm_unlock_start(ls, lkb, flags);
3435 
3436 	error = set_unlock_args(flags, astarg, &args);
3437 	if (error)
3438 		goto out_put;
3439 
3440 	if (flags & DLM_LKF_CANCEL)
3441 		error = cancel_lock(ls, lkb, &args);
3442 	else
3443 		error = unlock_lock(ls, lkb, &args);
3444 
3445 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3446 		error = 0;
3447 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3448 		error = 0;
3449  out_put:
3450 	trace_dlm_unlock_end(ls, lkb, flags, error);
3451 
3452 	dlm_put_lkb(lkb);
3453  out:
3454 	dlm_unlock_recovery(ls);
3455 	dlm_put_lockspace(ls);
3456 	return error;
3457 }
3458 
3459 /*
3460  * send/receive routines for remote operations and replies
3461  *
3462  * send_args
3463  * send_common
3464  * send_request			receive_request
3465  * send_convert			receive_convert
3466  * send_unlock			receive_unlock
3467  * send_cancel			receive_cancel
3468  * send_grant			receive_grant
3469  * send_bast			receive_bast
3470  * send_lookup			receive_lookup
3471  * send_remove			receive_remove
3472  *
3473  * 				send_common_reply
3474  * receive_request_reply	send_request_reply
3475  * receive_convert_reply	send_convert_reply
3476  * receive_unlock_reply		send_unlock_reply
3477  * receive_cancel_reply		send_cancel_reply
3478  * receive_lookup_reply		send_lookup_reply
3479  */
3480 
3481 static int _create_message(struct dlm_ls *ls, int mb_len,
3482 			   int to_nodeid, int mstype,
3483 			   struct dlm_message **ms_ret,
3484 			   struct dlm_mhandle **mh_ret)
3485 {
3486 	struct dlm_message *ms;
3487 	struct dlm_mhandle *mh;
3488 	char *mb;
3489 
3490 	/* get_buffer gives us a message handle (mh) that we need to
3491 	   pass into midcomms_commit and a message buffer (mb) that we
3492 	   write our data into */
3493 
3494 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3495 	if (!mh)
3496 		return -ENOBUFS;
3497 
3498 	ms = (struct dlm_message *) mb;
3499 
3500 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3501 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3502 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3503 	ms->m_header.h_length = cpu_to_le16(mb_len);
3504 	ms->m_header.h_cmd = DLM_MSG;
3505 
3506 	ms->m_type = cpu_to_le32(mstype);
3507 
3508 	*mh_ret = mh;
3509 	*ms_ret = ms;
3510 	return 0;
3511 }
3512 
3513 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3514 			  int to_nodeid, int mstype,
3515 			  struct dlm_message **ms_ret,
3516 			  struct dlm_mhandle **mh_ret)
3517 {
3518 	int mb_len = sizeof(struct dlm_message);
3519 
3520 	switch (mstype) {
3521 	case DLM_MSG_REQUEST:
3522 	case DLM_MSG_LOOKUP:
3523 	case DLM_MSG_REMOVE:
3524 		mb_len += r->res_length;
3525 		break;
3526 	case DLM_MSG_CONVERT:
3527 	case DLM_MSG_UNLOCK:
3528 	case DLM_MSG_REQUEST_REPLY:
3529 	case DLM_MSG_CONVERT_REPLY:
3530 	case DLM_MSG_GRANT:
3531 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3532 			mb_len += r->res_ls->ls_lvblen;
3533 		break;
3534 	}
3535 
3536 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3537 			       ms_ret, mh_ret);
3538 }
3539 
3540 /* further lowcomms enhancements or alternate implementations may make
3541    the return value from this function useful at some point */
3542 
3543 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3544 			const void *name, int namelen)
3545 {
3546 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3547 	return 0;
3548 }
3549 
3550 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551 		      struct dlm_message *ms)
3552 {
3553 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3554 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3555 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3556 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3557 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3558 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3559 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3560 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3561 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3562 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3563 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3564 	ms->m_hash     = cpu_to_le32(r->res_hash);
3565 
3566 	/* m_result and m_bastmode are set from function args,
3567 	   not from lkb fields */
3568 
3569 	if (lkb->lkb_bastfn)
3570 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3571 	if (lkb->lkb_astfn)
3572 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3573 
3574 	/* compare with switch in create_message; send_remove() doesn't
3575 	   use send_args() */
3576 
3577 	switch (ms->m_type) {
3578 	case cpu_to_le32(DLM_MSG_REQUEST):
3579 	case cpu_to_le32(DLM_MSG_LOOKUP):
3580 		memcpy(ms->m_extra, r->res_name, r->res_length);
3581 		break;
3582 	case cpu_to_le32(DLM_MSG_CONVERT):
3583 	case cpu_to_le32(DLM_MSG_UNLOCK):
3584 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3585 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3586 	case cpu_to_le32(DLM_MSG_GRANT):
3587 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3588 			break;
3589 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3590 		break;
3591 	}
3592 }
3593 
3594 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3595 {
3596 	struct dlm_message *ms;
3597 	struct dlm_mhandle *mh;
3598 	int to_nodeid, error;
3599 
3600 	to_nodeid = r->res_nodeid;
3601 
3602 	add_to_waiters(lkb, mstype, to_nodeid);
3603 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3604 	if (error)
3605 		goto fail;
3606 
3607 	send_args(r, lkb, ms);
3608 
3609 	error = send_message(mh, ms, r->res_name, r->res_length);
3610 	if (error)
3611 		goto fail;
3612 	return 0;
3613 
3614  fail:
3615 	remove_from_waiters(lkb, msg_reply_type(mstype));
3616 	return error;
3617 }
3618 
3619 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3620 {
3621 	return send_common(r, lkb, DLM_MSG_REQUEST);
3622 }
3623 
3624 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3625 {
3626 	int error;
3627 
3628 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3629 
3630 	/* down conversions go without a reply from the master */
3631 	if (!error && down_conversion(lkb)) {
3632 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3633 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3634 		r->res_ls->ls_local_ms.m_result = 0;
3635 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3636 	}
3637 
3638 	return error;
3639 }
3640 
3641 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3642    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3643    that the master is still correct. */
3644 
3645 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 {
3647 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3648 }
3649 
3650 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3651 {
3652 	return send_common(r, lkb, DLM_MSG_CANCEL);
3653 }
3654 
3655 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3656 {
3657 	struct dlm_message *ms;
3658 	struct dlm_mhandle *mh;
3659 	int to_nodeid, error;
3660 
3661 	to_nodeid = lkb->lkb_nodeid;
3662 
3663 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3664 	if (error)
3665 		goto out;
3666 
3667 	send_args(r, lkb, ms);
3668 
3669 	ms->m_result = 0;
3670 
3671 	error = send_message(mh, ms, r->res_name, r->res_length);
3672  out:
3673 	return error;
3674 }
3675 
3676 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3677 {
3678 	struct dlm_message *ms;
3679 	struct dlm_mhandle *mh;
3680 	int to_nodeid, error;
3681 
3682 	to_nodeid = lkb->lkb_nodeid;
3683 
3684 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3685 	if (error)
3686 		goto out;
3687 
3688 	send_args(r, lkb, ms);
3689 
3690 	ms->m_bastmode = cpu_to_le32(mode);
3691 
3692 	error = send_message(mh, ms, r->res_name, r->res_length);
3693  out:
3694 	return error;
3695 }
3696 
3697 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3698 {
3699 	struct dlm_message *ms;
3700 	struct dlm_mhandle *mh;
3701 	int to_nodeid, error;
3702 
3703 	to_nodeid = dlm_dir_nodeid(r);
3704 
3705 	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3706 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3707 	if (error)
3708 		goto fail;
3709 
3710 	send_args(r, lkb, ms);
3711 
3712 	error = send_message(mh, ms, r->res_name, r->res_length);
3713 	if (error)
3714 		goto fail;
3715 	return 0;
3716 
3717  fail:
3718 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3719 	return error;
3720 }
3721 
3722 static int send_remove(struct dlm_rsb *r)
3723 {
3724 	struct dlm_message *ms;
3725 	struct dlm_mhandle *mh;
3726 	int to_nodeid, error;
3727 
3728 	to_nodeid = dlm_dir_nodeid(r);
3729 
3730 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3731 	if (error)
3732 		goto out;
3733 
3734 	memcpy(ms->m_extra, r->res_name, r->res_length);
3735 	ms->m_hash = cpu_to_le32(r->res_hash);
3736 
3737 	error = send_message(mh, ms, r->res_name, r->res_length);
3738  out:
3739 	return error;
3740 }
3741 
3742 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3743 			     int mstype, int rv)
3744 {
3745 	struct dlm_message *ms;
3746 	struct dlm_mhandle *mh;
3747 	int to_nodeid, error;
3748 
3749 	to_nodeid = lkb->lkb_nodeid;
3750 
3751 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3752 	if (error)
3753 		goto out;
3754 
3755 	send_args(r, lkb, ms);
3756 
3757 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3758 
3759 	error = send_message(mh, ms, r->res_name, r->res_length);
3760  out:
3761 	return error;
3762 }
3763 
3764 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3765 {
3766 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3767 }
3768 
3769 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3770 {
3771 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3772 }
3773 
3774 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3775 {
3776 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3777 }
3778 
3779 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3780 {
3781 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3782 }
3783 
3784 static int send_lookup_reply(struct dlm_ls *ls,
3785 			     const struct dlm_message *ms_in, int ret_nodeid,
3786 			     int rv)
3787 {
3788 	struct dlm_rsb *r = &ls->ls_local_rsb;
3789 	struct dlm_message *ms;
3790 	struct dlm_mhandle *mh;
3791 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3792 
3793 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3794 	if (error)
3795 		goto out;
3796 
3797 	ms->m_lkid = ms_in->m_lkid;
3798 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3799 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3800 
3801 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3802  out:
3803 	return error;
3804 }
3805 
3806 /* which args we save from a received message depends heavily on the type
3807    of message, unlike the send side where we can safely send everything about
3808    the lkb for any type of message */
3809 
3810 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3811 {
3812 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3813 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3814 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3815 }
3816 
3817 static void receive_flags_reply(struct dlm_lkb *lkb,
3818 				const struct dlm_message *ms,
3819 				bool local)
3820 {
3821 	if (local)
3822 		return;
3823 
3824 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3825 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3826 }
3827 
3828 static int receive_extralen(const struct dlm_message *ms)
3829 {
3830 	return (le16_to_cpu(ms->m_header.h_length) -
3831 		sizeof(struct dlm_message));
3832 }
3833 
3834 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3835 		       const struct dlm_message *ms)
3836 {
3837 	int len;
3838 
3839 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3840 		if (!lkb->lkb_lvbptr)
3841 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3842 		if (!lkb->lkb_lvbptr)
3843 			return -ENOMEM;
3844 		len = receive_extralen(ms);
3845 		if (len > ls->ls_lvblen)
3846 			len = ls->ls_lvblen;
3847 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3848 	}
3849 	return 0;
3850 }
3851 
3852 static void fake_bastfn(void *astparam, int mode)
3853 {
3854 	log_print("fake_bastfn should not be called");
3855 }
3856 
3857 static void fake_astfn(void *astparam)
3858 {
3859 	log_print("fake_astfn should not be called");
3860 }
3861 
3862 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3863 				const struct dlm_message *ms)
3864 {
3865 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3866 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3867 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3868 	lkb->lkb_grmode = DLM_LOCK_IV;
3869 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3870 
3871 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3872 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3873 
3874 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3875 		/* lkb was just created so there won't be an lvb yet */
3876 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3877 		if (!lkb->lkb_lvbptr)
3878 			return -ENOMEM;
3879 	}
3880 
3881 	return 0;
3882 }
3883 
3884 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3885 				const struct dlm_message *ms)
3886 {
3887 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3888 		return -EBUSY;
3889 
3890 	if (receive_lvb(ls, lkb, ms))
3891 		return -ENOMEM;
3892 
3893 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3894 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3895 
3896 	return 0;
3897 }
3898 
3899 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3900 			       const struct dlm_message *ms)
3901 {
3902 	if (receive_lvb(ls, lkb, ms))
3903 		return -ENOMEM;
3904 	return 0;
3905 }
3906 
3907 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3908    uses to send a reply and that the remote end uses to process the reply. */
3909 
3910 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3911 {
3912 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3913 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3914 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3915 }
3916 
3917 /* This is called after the rsb is locked so that we can safely inspect
3918    fields in the lkb. */
3919 
3920 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3921 {
3922 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3923 	int error = 0;
3924 
3925 	/* currently mixing of user/kernel locks are not supported */
3926 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3927 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3928 		log_error(lkb->lkb_resource->res_ls,
3929 			  "got user dlm message for a kernel lock");
3930 		error = -EINVAL;
3931 		goto out;
3932 	}
3933 
3934 	switch (ms->m_type) {
3935 	case cpu_to_le32(DLM_MSG_CONVERT):
3936 	case cpu_to_le32(DLM_MSG_UNLOCK):
3937 	case cpu_to_le32(DLM_MSG_CANCEL):
3938 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3939 			error = -EINVAL;
3940 		break;
3941 
3942 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3943 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3944 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3945 	case cpu_to_le32(DLM_MSG_GRANT):
3946 	case cpu_to_le32(DLM_MSG_BAST):
3947 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3948 			error = -EINVAL;
3949 		break;
3950 
3951 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3952 		if (!is_process_copy(lkb))
3953 			error = -EINVAL;
3954 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3955 			error = -EINVAL;
3956 		break;
3957 
3958 	default:
3959 		error = -EINVAL;
3960 	}
3961 
3962 out:
3963 	if (error)
3964 		log_error(lkb->lkb_resource->res_ls,
3965 			  "ignore invalid message %d from %d %x %x %x %d",
3966 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3967 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3968 			  lkb->lkb_nodeid);
3969 	return error;
3970 }
3971 
3972 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3973 {
3974 	struct dlm_lkb *lkb;
3975 	struct dlm_rsb *r;
3976 	int from_nodeid;
3977 	int error, namelen = 0;
3978 
3979 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3980 
3981 	error = create_lkb(ls, &lkb);
3982 	if (error)
3983 		goto fail;
3984 
3985 	receive_flags(lkb, ms);
3986 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3987 	error = receive_request_args(ls, lkb, ms);
3988 	if (error) {
3989 		__put_lkb(ls, lkb);
3990 		goto fail;
3991 	}
3992 
3993 	/* The dir node is the authority on whether we are the master
3994 	   for this rsb or not, so if the master sends us a request, we should
3995 	   recreate the rsb if we've destroyed it.   This race happens when we
3996 	   send a remove message to the dir node at the same time that the dir
3997 	   node sends us a request for the rsb. */
3998 
3999 	namelen = receive_extralen(ms);
4000 
4001 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4002 			 R_RECEIVE_REQUEST, &r);
4003 	if (error) {
4004 		__put_lkb(ls, lkb);
4005 		goto fail;
4006 	}
4007 
4008 	lock_rsb(r);
4009 
4010 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4011 		error = validate_master_nodeid(ls, r, from_nodeid);
4012 		if (error) {
4013 			unlock_rsb(r);
4014 			put_rsb(r);
4015 			__put_lkb(ls, lkb);
4016 			goto fail;
4017 		}
4018 	}
4019 
4020 	attach_lkb(r, lkb);
4021 	error = do_request(r, lkb);
4022 	send_request_reply(r, lkb, error);
4023 	do_request_effects(r, lkb, error);
4024 
4025 	unlock_rsb(r);
4026 	put_rsb(r);
4027 
4028 	if (error == -EINPROGRESS)
4029 		error = 0;
4030 	if (error)
4031 		dlm_put_lkb(lkb);
4032 	return 0;
4033 
4034  fail:
4035 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4036 	   and do this receive_request again from process_lookup_list once
4037 	   we get the lookup reply.  This would avoid a many repeated
4038 	   ENOTBLK request failures when the lookup reply designating us
4039 	   as master is delayed. */
4040 
4041 	if (error != -ENOTBLK) {
4042 		log_limit(ls, "receive_request %x from %d %d",
4043 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4044 	}
4045 
4046 	setup_local_lkb(ls, ms);
4047 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4048 	return error;
4049 }
4050 
4051 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4052 {
4053 	struct dlm_lkb *lkb;
4054 	struct dlm_rsb *r;
4055 	int error, reply = 1;
4056 
4057 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4058 	if (error)
4059 		goto fail;
4060 
4061 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4062 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4063 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4064 			  (unsigned long long)lkb->lkb_recover_seq,
4065 			  le32_to_cpu(ms->m_header.h_nodeid),
4066 			  le32_to_cpu(ms->m_lkid));
4067 		error = -ENOENT;
4068 		dlm_put_lkb(lkb);
4069 		goto fail;
4070 	}
4071 
4072 	r = lkb->lkb_resource;
4073 
4074 	hold_rsb(r);
4075 	lock_rsb(r);
4076 
4077 	error = validate_message(lkb, ms);
4078 	if (error)
4079 		goto out;
4080 
4081 	receive_flags(lkb, ms);
4082 
4083 	error = receive_convert_args(ls, lkb, ms);
4084 	if (error) {
4085 		send_convert_reply(r, lkb, error);
4086 		goto out;
4087 	}
4088 
4089 	reply = !down_conversion(lkb);
4090 
4091 	error = do_convert(r, lkb);
4092 	if (reply)
4093 		send_convert_reply(r, lkb, error);
4094 	do_convert_effects(r, lkb, error);
4095  out:
4096 	unlock_rsb(r);
4097 	put_rsb(r);
4098 	dlm_put_lkb(lkb);
4099 	return 0;
4100 
4101  fail:
4102 	setup_local_lkb(ls, ms);
4103 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4104 	return error;
4105 }
4106 
4107 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4108 {
4109 	struct dlm_lkb *lkb;
4110 	struct dlm_rsb *r;
4111 	int error;
4112 
4113 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4114 	if (error)
4115 		goto fail;
4116 
4117 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4118 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4119 			  lkb->lkb_id, lkb->lkb_remid,
4120 			  le32_to_cpu(ms->m_header.h_nodeid),
4121 			  le32_to_cpu(ms->m_lkid));
4122 		error = -ENOENT;
4123 		dlm_put_lkb(lkb);
4124 		goto fail;
4125 	}
4126 
4127 	r = lkb->lkb_resource;
4128 
4129 	hold_rsb(r);
4130 	lock_rsb(r);
4131 
4132 	error = validate_message(lkb, ms);
4133 	if (error)
4134 		goto out;
4135 
4136 	receive_flags(lkb, ms);
4137 
4138 	error = receive_unlock_args(ls, lkb, ms);
4139 	if (error) {
4140 		send_unlock_reply(r, lkb, error);
4141 		goto out;
4142 	}
4143 
4144 	error = do_unlock(r, lkb);
4145 	send_unlock_reply(r, lkb, error);
4146 	do_unlock_effects(r, lkb, error);
4147  out:
4148 	unlock_rsb(r);
4149 	put_rsb(r);
4150 	dlm_put_lkb(lkb);
4151 	return 0;
4152 
4153  fail:
4154 	setup_local_lkb(ls, ms);
4155 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4156 	return error;
4157 }
4158 
4159 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4160 {
4161 	struct dlm_lkb *lkb;
4162 	struct dlm_rsb *r;
4163 	int error;
4164 
4165 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4166 	if (error)
4167 		goto fail;
4168 
4169 	receive_flags(lkb, ms);
4170 
4171 	r = lkb->lkb_resource;
4172 
4173 	hold_rsb(r);
4174 	lock_rsb(r);
4175 
4176 	error = validate_message(lkb, ms);
4177 	if (error)
4178 		goto out;
4179 
4180 	error = do_cancel(r, lkb);
4181 	send_cancel_reply(r, lkb, error);
4182 	do_cancel_effects(r, lkb, error);
4183  out:
4184 	unlock_rsb(r);
4185 	put_rsb(r);
4186 	dlm_put_lkb(lkb);
4187 	return 0;
4188 
4189  fail:
4190 	setup_local_lkb(ls, ms);
4191 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4192 	return error;
4193 }
4194 
4195 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4196 {
4197 	struct dlm_lkb *lkb;
4198 	struct dlm_rsb *r;
4199 	int error;
4200 
4201 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4202 	if (error)
4203 		return error;
4204 
4205 	r = lkb->lkb_resource;
4206 
4207 	hold_rsb(r);
4208 	lock_rsb(r);
4209 
4210 	error = validate_message(lkb, ms);
4211 	if (error)
4212 		goto out;
4213 
4214 	receive_flags_reply(lkb, ms, false);
4215 	if (is_altmode(lkb))
4216 		munge_altmode(lkb, ms);
4217 	grant_lock_pc(r, lkb, ms);
4218 	queue_cast(r, lkb, 0);
4219  out:
4220 	unlock_rsb(r);
4221 	put_rsb(r);
4222 	dlm_put_lkb(lkb);
4223 	return 0;
4224 }
4225 
4226 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4227 {
4228 	struct dlm_lkb *lkb;
4229 	struct dlm_rsb *r;
4230 	int error;
4231 
4232 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4233 	if (error)
4234 		return error;
4235 
4236 	r = lkb->lkb_resource;
4237 
4238 	hold_rsb(r);
4239 	lock_rsb(r);
4240 
4241 	error = validate_message(lkb, ms);
4242 	if (error)
4243 		goto out;
4244 
4245 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4246 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4247  out:
4248 	unlock_rsb(r);
4249 	put_rsb(r);
4250 	dlm_put_lkb(lkb);
4251 	return 0;
4252 }
4253 
4254 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4255 {
4256 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4257 
4258 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4259 	our_nodeid = dlm_our_nodeid();
4260 
4261 	len = receive_extralen(ms);
4262 
4263 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4264 				  &ret_nodeid, NULL);
4265 
4266 	/* Optimization: we're master so treat lookup as a request */
4267 	if (!error && ret_nodeid == our_nodeid) {
4268 		receive_request(ls, ms);
4269 		return;
4270 	}
4271 	send_lookup_reply(ls, ms, ret_nodeid, error);
4272 }
4273 
4274 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4275 {
4276 	char name[DLM_RESNAME_MAXLEN+1];
4277 	struct dlm_rsb *r;
4278 	int rv, len, dir_nodeid, from_nodeid;
4279 
4280 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4281 
4282 	len = receive_extralen(ms);
4283 
4284 	if (len > DLM_RESNAME_MAXLEN) {
4285 		log_error(ls, "receive_remove from %d bad len %d",
4286 			  from_nodeid, len);
4287 		return;
4288 	}
4289 
4290 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4291 	if (dir_nodeid != dlm_our_nodeid()) {
4292 		log_error(ls, "receive_remove from %d bad nodeid %d",
4293 			  from_nodeid, dir_nodeid);
4294 		return;
4295 	}
4296 
4297 	/*
4298 	 * Look for inactive rsb, if it's there, free it.
4299 	 * If the rsb is active, it's being used, and we should ignore this
4300 	 * message.  This is an expected race between the dir node sending a
4301 	 * request to the master node at the same time as the master node sends
4302 	 * a remove to the dir node.  The resolution to that race is for the
4303 	 * dir node to ignore the remove message, and the master node to
4304 	 * recreate the master rsb when it gets a request from the dir node for
4305 	 * an rsb it doesn't have.
4306 	 */
4307 
4308 	memset(name, 0, sizeof(name));
4309 	memcpy(name, ms->m_extra, len);
4310 
4311 	rcu_read_lock();
4312 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4313 	if (rv) {
4314 		rcu_read_unlock();
4315 		/* should not happen */
4316 		log_error(ls, "%s from %d not found %s", __func__,
4317 			  from_nodeid, name);
4318 		return;
4319 	}
4320 
4321 	write_lock_bh(&ls->ls_rsbtbl_lock);
4322 	if (!rsb_flag(r, RSB_HASHED)) {
4323 		rcu_read_unlock();
4324 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4325 		/* should not happen */
4326 		log_error(ls, "%s from %d got removed during removal %s",
4327 			  __func__, from_nodeid, name);
4328 		return;
4329 	}
4330 	/* at this stage the rsb can only being freed here */
4331 	rcu_read_unlock();
4332 
4333 	if (!rsb_flag(r, RSB_INACTIVE)) {
4334 		if (r->res_master_nodeid != from_nodeid) {
4335 			/* should not happen */
4336 			log_error(ls, "receive_remove on active rsb from %d master %d",
4337 				  from_nodeid, r->res_master_nodeid);
4338 			dlm_print_rsb(r);
4339 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4340 			return;
4341 		}
4342 
4343 		/* Ignore the remove message, see race comment above. */
4344 
4345 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4346 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4347 			  name);
4348 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4349 		return;
4350 	}
4351 
4352 	if (r->res_master_nodeid != from_nodeid) {
4353 		log_error(ls, "receive_remove inactive from %d master %d",
4354 			  from_nodeid, r->res_master_nodeid);
4355 		dlm_print_rsb(r);
4356 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 		return;
4358 	}
4359 
4360 	list_del(&r->res_slow_list);
4361 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4362 			       dlm_rhash_rsb_params);
4363 	rsb_clear_flag(r, RSB_HASHED);
4364 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4365 
4366 	free_inactive_rsb(r);
4367 }
4368 
4369 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4370 {
4371 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4372 }
4373 
4374 static int receive_request_reply(struct dlm_ls *ls,
4375 				 const struct dlm_message *ms)
4376 {
4377 	struct dlm_lkb *lkb;
4378 	struct dlm_rsb *r;
4379 	int error, mstype, result;
4380 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4381 
4382 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4383 	if (error)
4384 		return error;
4385 
4386 	r = lkb->lkb_resource;
4387 	hold_rsb(r);
4388 	lock_rsb(r);
4389 
4390 	error = validate_message(lkb, ms);
4391 	if (error)
4392 		goto out;
4393 
4394 	mstype = lkb->lkb_wait_type;
4395 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4396 	if (error) {
4397 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4398 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4399 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4400 		dlm_dump_rsb(r);
4401 		goto out;
4402 	}
4403 
4404 	/* Optimization: the dir node was also the master, so it took our
4405 	   lookup as a request and sent request reply instead of lookup reply */
4406 	if (mstype == DLM_MSG_LOOKUP) {
4407 		r->res_master_nodeid = from_nodeid;
4408 		r->res_nodeid = from_nodeid;
4409 		lkb->lkb_nodeid = from_nodeid;
4410 	}
4411 
4412 	/* this is the value returned from do_request() on the master */
4413 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4414 
4415 	switch (result) {
4416 	case -EAGAIN:
4417 		/* request would block (be queued) on remote master */
4418 		queue_cast(r, lkb, -EAGAIN);
4419 		confirm_master(r, -EAGAIN);
4420 		unhold_lkb(lkb); /* undoes create_lkb() */
4421 		break;
4422 
4423 	case -EINPROGRESS:
4424 	case 0:
4425 		/* request was queued or granted on remote master */
4426 		receive_flags_reply(lkb, ms, false);
4427 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4428 		if (is_altmode(lkb))
4429 			munge_altmode(lkb, ms);
4430 		if (result) {
4431 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4432 		} else {
4433 			grant_lock_pc(r, lkb, ms);
4434 			queue_cast(r, lkb, 0);
4435 		}
4436 		confirm_master(r, result);
4437 		break;
4438 
4439 	case -EBADR:
4440 	case -ENOTBLK:
4441 		/* find_rsb failed to find rsb or rsb wasn't master */
4442 		log_limit(ls, "receive_request_reply %x from %d %d "
4443 			  "master %d dir %d first %x %s", lkb->lkb_id,
4444 			  from_nodeid, result, r->res_master_nodeid,
4445 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4446 
4447 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4448 		    r->res_master_nodeid != dlm_our_nodeid()) {
4449 			/* cause _request_lock->set_master->send_lookup */
4450 			r->res_master_nodeid = 0;
4451 			r->res_nodeid = -1;
4452 			lkb->lkb_nodeid = -1;
4453 		}
4454 
4455 		if (is_overlap(lkb)) {
4456 			/* we'll ignore error in cancel/unlock reply */
4457 			queue_cast_overlap(r, lkb);
4458 			confirm_master(r, result);
4459 			unhold_lkb(lkb); /* undoes create_lkb() */
4460 		} else {
4461 			_request_lock(r, lkb);
4462 
4463 			if (r->res_master_nodeid == dlm_our_nodeid())
4464 				confirm_master(r, 0);
4465 		}
4466 		break;
4467 
4468 	default:
4469 		log_error(ls, "receive_request_reply %x error %d",
4470 			  lkb->lkb_id, result);
4471 	}
4472 
4473 	if ((result == 0 || result == -EINPROGRESS) &&
4474 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4475 		log_debug(ls, "receive_request_reply %x result %d unlock",
4476 			  lkb->lkb_id, result);
4477 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4478 		send_unlock(r, lkb);
4479 	} else if ((result == -EINPROGRESS) &&
4480 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4481 				      &lkb->lkb_iflags)) {
4482 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4483 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4484 		send_cancel(r, lkb);
4485 	} else {
4486 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4487 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4488 	}
4489  out:
4490 	unlock_rsb(r);
4491 	put_rsb(r);
4492 	dlm_put_lkb(lkb);
4493 	return 0;
4494 }
4495 
4496 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4497 				    const struct dlm_message *ms, bool local)
4498 {
4499 	/* this is the value returned from do_convert() on the master */
4500 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4501 	case -EAGAIN:
4502 		/* convert would block (be queued) on remote master */
4503 		queue_cast(r, lkb, -EAGAIN);
4504 		break;
4505 
4506 	case -EDEADLK:
4507 		receive_flags_reply(lkb, ms, local);
4508 		revert_lock_pc(r, lkb);
4509 		queue_cast(r, lkb, -EDEADLK);
4510 		break;
4511 
4512 	case -EINPROGRESS:
4513 		/* convert was queued on remote master */
4514 		receive_flags_reply(lkb, ms, local);
4515 		if (is_demoted(lkb))
4516 			munge_demoted(lkb);
4517 		del_lkb(r, lkb);
4518 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4519 		break;
4520 
4521 	case 0:
4522 		/* convert was granted on remote master */
4523 		receive_flags_reply(lkb, ms, local);
4524 		if (is_demoted(lkb))
4525 			munge_demoted(lkb);
4526 		grant_lock_pc(r, lkb, ms);
4527 		queue_cast(r, lkb, 0);
4528 		break;
4529 
4530 	default:
4531 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4532 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4533 			  le32_to_cpu(ms->m_lkid),
4534 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4535 		dlm_print_rsb(r);
4536 		dlm_print_lkb(lkb);
4537 	}
4538 }
4539 
4540 static void _receive_convert_reply(struct dlm_lkb *lkb,
4541 				   const struct dlm_message *ms, bool local)
4542 {
4543 	struct dlm_rsb *r = lkb->lkb_resource;
4544 	int error;
4545 
4546 	hold_rsb(r);
4547 	lock_rsb(r);
4548 
4549 	error = validate_message(lkb, ms);
4550 	if (error)
4551 		goto out;
4552 
4553 	error = remove_from_waiters_ms(lkb, ms, local);
4554 	if (error)
4555 		goto out;
4556 
4557 	__receive_convert_reply(r, lkb, ms, local);
4558  out:
4559 	unlock_rsb(r);
4560 	put_rsb(r);
4561 }
4562 
4563 static int receive_convert_reply(struct dlm_ls *ls,
4564 				 const struct dlm_message *ms)
4565 {
4566 	struct dlm_lkb *lkb;
4567 	int error;
4568 
4569 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4570 	if (error)
4571 		return error;
4572 
4573 	_receive_convert_reply(lkb, ms, false);
4574 	dlm_put_lkb(lkb);
4575 	return 0;
4576 }
4577 
4578 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4579 				  const struct dlm_message *ms, bool local)
4580 {
4581 	struct dlm_rsb *r = lkb->lkb_resource;
4582 	int error;
4583 
4584 	hold_rsb(r);
4585 	lock_rsb(r);
4586 
4587 	error = validate_message(lkb, ms);
4588 	if (error)
4589 		goto out;
4590 
4591 	error = remove_from_waiters_ms(lkb, ms, local);
4592 	if (error)
4593 		goto out;
4594 
4595 	/* this is the value returned from do_unlock() on the master */
4596 
4597 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4598 	case -DLM_EUNLOCK:
4599 		receive_flags_reply(lkb, ms, local);
4600 		remove_lock_pc(r, lkb);
4601 		queue_cast(r, lkb, -DLM_EUNLOCK);
4602 		break;
4603 	case -ENOENT:
4604 		break;
4605 	default:
4606 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4607 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4608 	}
4609  out:
4610 	unlock_rsb(r);
4611 	put_rsb(r);
4612 }
4613 
4614 static int receive_unlock_reply(struct dlm_ls *ls,
4615 				const struct dlm_message *ms)
4616 {
4617 	struct dlm_lkb *lkb;
4618 	int error;
4619 
4620 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4621 	if (error)
4622 		return error;
4623 
4624 	_receive_unlock_reply(lkb, ms, false);
4625 	dlm_put_lkb(lkb);
4626 	return 0;
4627 }
4628 
4629 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4630 				  const struct dlm_message *ms, bool local)
4631 {
4632 	struct dlm_rsb *r = lkb->lkb_resource;
4633 	int error;
4634 
4635 	hold_rsb(r);
4636 	lock_rsb(r);
4637 
4638 	error = validate_message(lkb, ms);
4639 	if (error)
4640 		goto out;
4641 
4642 	error = remove_from_waiters_ms(lkb, ms, local);
4643 	if (error)
4644 		goto out;
4645 
4646 	/* this is the value returned from do_cancel() on the master */
4647 
4648 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4649 	case -DLM_ECANCEL:
4650 		receive_flags_reply(lkb, ms, local);
4651 		revert_lock_pc(r, lkb);
4652 		queue_cast(r, lkb, -DLM_ECANCEL);
4653 		break;
4654 	case 0:
4655 		break;
4656 	default:
4657 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4658 			  lkb->lkb_id,
4659 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4660 	}
4661  out:
4662 	unlock_rsb(r);
4663 	put_rsb(r);
4664 }
4665 
4666 static int receive_cancel_reply(struct dlm_ls *ls,
4667 				const struct dlm_message *ms)
4668 {
4669 	struct dlm_lkb *lkb;
4670 	int error;
4671 
4672 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4673 	if (error)
4674 		return error;
4675 
4676 	_receive_cancel_reply(lkb, ms, false);
4677 	dlm_put_lkb(lkb);
4678 	return 0;
4679 }
4680 
4681 static void receive_lookup_reply(struct dlm_ls *ls,
4682 				 const struct dlm_message *ms)
4683 {
4684 	struct dlm_lkb *lkb;
4685 	struct dlm_rsb *r;
4686 	int error, ret_nodeid;
4687 	int do_lookup_list = 0;
4688 
4689 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4690 	if (error) {
4691 		log_error(ls, "%s no lkid %x", __func__,
4692 			  le32_to_cpu(ms->m_lkid));
4693 		return;
4694 	}
4695 
4696 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4697 	   FIXME: will a non-zero error ever be returned? */
4698 
4699 	r = lkb->lkb_resource;
4700 	hold_rsb(r);
4701 	lock_rsb(r);
4702 
4703 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4704 	if (error)
4705 		goto out;
4706 
4707 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4708 
4709 	/* We sometimes receive a request from the dir node for this
4710 	   rsb before we've received the dir node's loookup_reply for it.
4711 	   The request from the dir node implies we're the master, so we set
4712 	   ourself as master in receive_request_reply, and verify here that
4713 	   we are indeed the master. */
4714 
4715 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4716 		/* This should never happen */
4717 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4718 			  "master %d dir %d our %d first %x %s",
4719 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4720 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4721 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4722 	}
4723 
4724 	if (ret_nodeid == dlm_our_nodeid()) {
4725 		r->res_master_nodeid = ret_nodeid;
4726 		r->res_nodeid = 0;
4727 		do_lookup_list = 1;
4728 		r->res_first_lkid = 0;
4729 	} else if (ret_nodeid == -1) {
4730 		/* the remote node doesn't believe it's the dir node */
4731 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4732 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4733 		r->res_master_nodeid = 0;
4734 		r->res_nodeid = -1;
4735 		lkb->lkb_nodeid = -1;
4736 	} else {
4737 		/* set_master() will set lkb_nodeid from r */
4738 		r->res_master_nodeid = ret_nodeid;
4739 		r->res_nodeid = ret_nodeid;
4740 	}
4741 
4742 	if (is_overlap(lkb)) {
4743 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4744 			  lkb->lkb_id, dlm_iflags_val(lkb));
4745 		queue_cast_overlap(r, lkb);
4746 		unhold_lkb(lkb); /* undoes create_lkb() */
4747 		goto out_list;
4748 	}
4749 
4750 	_request_lock(r, lkb);
4751 
4752  out_list:
4753 	if (do_lookup_list)
4754 		process_lookup_list(r);
4755  out:
4756 	unlock_rsb(r);
4757 	put_rsb(r);
4758 	dlm_put_lkb(lkb);
4759 }
4760 
4761 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4762 			     uint32_t saved_seq)
4763 {
4764 	int error = 0, noent = 0;
4765 
4766 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4767 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4768 			  le32_to_cpu(ms->m_type),
4769 			  le32_to_cpu(ms->m_header.h_nodeid),
4770 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4771 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4772 		return;
4773 	}
4774 
4775 	switch (ms->m_type) {
4776 
4777 	/* messages sent to a master node */
4778 
4779 	case cpu_to_le32(DLM_MSG_REQUEST):
4780 		error = receive_request(ls, ms);
4781 		break;
4782 
4783 	case cpu_to_le32(DLM_MSG_CONVERT):
4784 		error = receive_convert(ls, ms);
4785 		break;
4786 
4787 	case cpu_to_le32(DLM_MSG_UNLOCK):
4788 		error = receive_unlock(ls, ms);
4789 		break;
4790 
4791 	case cpu_to_le32(DLM_MSG_CANCEL):
4792 		noent = 1;
4793 		error = receive_cancel(ls, ms);
4794 		break;
4795 
4796 	/* messages sent from a master node (replies to above) */
4797 
4798 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4799 		error = receive_request_reply(ls, ms);
4800 		break;
4801 
4802 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4803 		error = receive_convert_reply(ls, ms);
4804 		break;
4805 
4806 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4807 		error = receive_unlock_reply(ls, ms);
4808 		break;
4809 
4810 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4811 		error = receive_cancel_reply(ls, ms);
4812 		break;
4813 
4814 	/* messages sent from a master node (only two types of async msg) */
4815 
4816 	case cpu_to_le32(DLM_MSG_GRANT):
4817 		noent = 1;
4818 		error = receive_grant(ls, ms);
4819 		break;
4820 
4821 	case cpu_to_le32(DLM_MSG_BAST):
4822 		noent = 1;
4823 		error = receive_bast(ls, ms);
4824 		break;
4825 
4826 	/* messages sent to a dir node */
4827 
4828 	case cpu_to_le32(DLM_MSG_LOOKUP):
4829 		receive_lookup(ls, ms);
4830 		break;
4831 
4832 	case cpu_to_le32(DLM_MSG_REMOVE):
4833 		receive_remove(ls, ms);
4834 		break;
4835 
4836 	/* messages sent from a dir node (remove has no reply) */
4837 
4838 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4839 		receive_lookup_reply(ls, ms);
4840 		break;
4841 
4842 	/* other messages */
4843 
4844 	case cpu_to_le32(DLM_MSG_PURGE):
4845 		receive_purge(ls, ms);
4846 		break;
4847 
4848 	default:
4849 		log_error(ls, "unknown message type %d",
4850 			  le32_to_cpu(ms->m_type));
4851 	}
4852 
4853 	/*
4854 	 * When checking for ENOENT, we're checking the result of
4855 	 * find_lkb(m_remid):
4856 	 *
4857 	 * The lock id referenced in the message wasn't found.  This may
4858 	 * happen in normal usage for the async messages and cancel, so
4859 	 * only use log_debug for them.
4860 	 *
4861 	 * Some errors are expected and normal.
4862 	 */
4863 
4864 	if (error == -ENOENT && noent) {
4865 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4866 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4867 			  le32_to_cpu(ms->m_header.h_nodeid),
4868 			  le32_to_cpu(ms->m_lkid), saved_seq);
4869 	} else if (error == -ENOENT) {
4870 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4871 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4872 			  le32_to_cpu(ms->m_header.h_nodeid),
4873 			  le32_to_cpu(ms->m_lkid), saved_seq);
4874 
4875 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4876 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4877 	}
4878 
4879 	if (error == -EINVAL) {
4880 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4881 			  "saved_seq %u",
4882 			  le32_to_cpu(ms->m_type),
4883 			  le32_to_cpu(ms->m_header.h_nodeid),
4884 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4885 			  saved_seq);
4886 	}
4887 }
4888 
4889 /* If the lockspace is in recovery mode (locking stopped), then normal
4890    messages are saved on the requestqueue for processing after recovery is
4891    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4892    messages off the requestqueue before we process new ones. This occurs right
4893    after recovery completes when we transition from saving all messages on
4894    requestqueue, to processing all the saved messages, to processing new
4895    messages as they arrive. */
4896 
4897 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4898 				int nodeid)
4899 {
4900 try_again:
4901 	read_lock_bh(&ls->ls_requestqueue_lock);
4902 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4903 		/* If we were a member of this lockspace, left, and rejoined,
4904 		   other nodes may still be sending us messages from the
4905 		   lockspace generation before we left. */
4906 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4907 			read_unlock_bh(&ls->ls_requestqueue_lock);
4908 			log_limit(ls, "receive %d from %d ignore old gen",
4909 				  le32_to_cpu(ms->m_type), nodeid);
4910 			return;
4911 		}
4912 
4913 		read_unlock_bh(&ls->ls_requestqueue_lock);
4914 		write_lock_bh(&ls->ls_requestqueue_lock);
4915 		/* recheck because we hold writelock now */
4916 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917 			write_unlock_bh(&ls->ls_requestqueue_lock);
4918 			goto try_again;
4919 		}
4920 
4921 		dlm_add_requestqueue(ls, nodeid, ms);
4922 		write_unlock_bh(&ls->ls_requestqueue_lock);
4923 	} else {
4924 		_receive_message(ls, ms, 0);
4925 		read_unlock_bh(&ls->ls_requestqueue_lock);
4926 	}
4927 }
4928 
4929 /* This is called by dlm_recoverd to process messages that were saved on
4930    the requestqueue. */
4931 
4932 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4933 			       uint32_t saved_seq)
4934 {
4935 	_receive_message(ls, ms, saved_seq);
4936 }
4937 
4938 /* This is called by the midcomms layer when something is received for
4939    the lockspace.  It could be either a MSG (normal message sent as part of
4940    standard locking activity) or an RCOM (recovery message sent as part of
4941    lockspace recovery). */
4942 
4943 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4944 {
4945 	const struct dlm_header *hd = &p->header;
4946 	struct dlm_ls *ls;
4947 	int type = 0;
4948 
4949 	switch (hd->h_cmd) {
4950 	case DLM_MSG:
4951 		type = le32_to_cpu(p->message.m_type);
4952 		break;
4953 	case DLM_RCOM:
4954 		type = le32_to_cpu(p->rcom.rc_type);
4955 		break;
4956 	default:
4957 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4958 		return;
4959 	}
4960 
4961 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4962 		log_print("invalid h_nodeid %d from %d lockspace %x",
4963 			  le32_to_cpu(hd->h_nodeid), nodeid,
4964 			  le32_to_cpu(hd->u.h_lockspace));
4965 		return;
4966 	}
4967 
4968 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4969 	if (!ls) {
4970 		if (dlm_config.ci_log_debug) {
4971 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4972 				"%u from %d cmd %d type %d\n",
4973 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4974 				hd->h_cmd, type);
4975 		}
4976 
4977 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4978 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4979 		return;
4980 	}
4981 
4982 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4983 	   be inactive (in this ls) before transitioning to recovery mode */
4984 
4985 	read_lock_bh(&ls->ls_recv_active);
4986 	if (hd->h_cmd == DLM_MSG)
4987 		dlm_receive_message(ls, &p->message, nodeid);
4988 	else if (hd->h_cmd == DLM_RCOM)
4989 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4990 	else
4991 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4992 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4993 	read_unlock_bh(&ls->ls_recv_active);
4994 
4995 	dlm_put_lockspace(ls);
4996 }
4997 
4998 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4999 				   struct dlm_message *ms_local)
5000 {
5001 	if (middle_conversion(lkb)) {
5002 		log_rinfo(ls, "%s %x middle convert in progress", __func__,
5003 			 lkb->lkb_id);
5004 
5005 		/* We sent this lock to the new master. The new master will
5006 		 * tell us when it's granted.  We no longer need a reply, so
5007 		 * use a fake reply to put the lkb into the right state.
5008 		 */
5009 		hold_lkb(lkb);
5010 		memset(ms_local, 0, sizeof(struct dlm_message));
5011 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5012 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5013 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5014 		_receive_convert_reply(lkb, ms_local, true);
5015 		unhold_lkb(lkb);
5016 
5017 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5018 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5019 	}
5020 
5021 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5022 	   conversions are async; there's no reply from the remote master */
5023 }
5024 
5025 /* A waiting lkb needs recovery if the master node has failed, or
5026    the master node is changing (only when no directory is used) */
5027 
5028 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5029 				 int dir_nodeid)
5030 {
5031 	if (dlm_no_directory(ls))
5032 		return 1;
5033 
5034 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5035 		return 1;
5036 
5037 	return 0;
5038 }
5039 
5040 /* Recovery for locks that are waiting for replies from nodes that are now
5041    gone.  We can just complete unlocks and cancels by faking a reply from the
5042    dead node.  Requests and up-conversions we flag to be resent after
5043    recovery.  Down-conversions can just be completed with a fake reply like
5044    unlocks.  Conversions between PR and CW need special attention. */
5045 
5046 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5047 {
5048 	struct dlm_lkb *lkb, *safe;
5049 	struct dlm_message *ms_local;
5050 	int wait_type, local_unlock_result, local_cancel_result;
5051 	int dir_nodeid;
5052 
5053 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5054 	if (!ms_local)
5055 		return;
5056 
5057 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5058 
5059 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5060 
5061 		/* exclude debug messages about unlocks because there can be so
5062 		   many and they aren't very interesting */
5063 
5064 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5065 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5066 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5067 				  lkb->lkb_id,
5068 				  lkb->lkb_remid,
5069 				  lkb->lkb_wait_type,
5070 				  lkb->lkb_resource->res_nodeid,
5071 				  lkb->lkb_nodeid,
5072 				  lkb->lkb_wait_nodeid,
5073 				  dir_nodeid);
5074 		}
5075 
5076 		/* all outstanding lookups, regardless of destination  will be
5077 		   resent after recovery is done */
5078 
5079 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5080 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5081 			continue;
5082 		}
5083 
5084 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5085 			continue;
5086 
5087 		wait_type = lkb->lkb_wait_type;
5088 		local_unlock_result = -DLM_EUNLOCK;
5089 		local_cancel_result = -DLM_ECANCEL;
5090 
5091 		/* Main reply may have been received leaving a zero wait_type,
5092 		   but a reply for the overlapping op may not have been
5093 		   received.  In that case we need to fake the appropriate
5094 		   reply for the overlap op. */
5095 
5096 		if (!wait_type) {
5097 			if (is_overlap_cancel(lkb)) {
5098 				wait_type = DLM_MSG_CANCEL;
5099 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5100 					local_cancel_result = 0;
5101 			}
5102 			if (is_overlap_unlock(lkb)) {
5103 				wait_type = DLM_MSG_UNLOCK;
5104 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5105 					local_unlock_result = -ENOENT;
5106 			}
5107 
5108 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5109 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5110 				  local_cancel_result, local_unlock_result);
5111 		}
5112 
5113 		switch (wait_type) {
5114 
5115 		case DLM_MSG_REQUEST:
5116 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5117 			break;
5118 
5119 		case DLM_MSG_CONVERT:
5120 			recover_convert_waiter(ls, lkb, ms_local);
5121 			break;
5122 
5123 		case DLM_MSG_UNLOCK:
5124 			hold_lkb(lkb);
5125 			memset(ms_local, 0, sizeof(struct dlm_message));
5126 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5127 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5128 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5129 			_receive_unlock_reply(lkb, ms_local, true);
5130 			dlm_put_lkb(lkb);
5131 			break;
5132 
5133 		case DLM_MSG_CANCEL:
5134 			hold_lkb(lkb);
5135 			memset(ms_local, 0, sizeof(struct dlm_message));
5136 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5137 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5138 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5139 			_receive_cancel_reply(lkb, ms_local, true);
5140 			dlm_put_lkb(lkb);
5141 			break;
5142 
5143 		default:
5144 			log_error(ls, "invalid lkb wait_type %d %d",
5145 				  lkb->lkb_wait_type, wait_type);
5146 		}
5147 		schedule();
5148 	}
5149 	kfree(ms_local);
5150 }
5151 
5152 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5153 {
5154 	struct dlm_lkb *lkb = NULL, *iter;
5155 
5156 	spin_lock_bh(&ls->ls_waiters_lock);
5157 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5158 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5159 			hold_lkb(iter);
5160 			lkb = iter;
5161 			break;
5162 		}
5163 	}
5164 	spin_unlock_bh(&ls->ls_waiters_lock);
5165 
5166 	return lkb;
5167 }
5168 
5169 /*
5170  * Forced state reset for locks that were in the middle of remote operations
5171  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5172  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5173  * list need to be reevaluated; some may need resending to a different node
5174  * than previously, and some may now need local handling rather than remote.
5175  *
5176  * First, the lkb state for the voided remote operation is forcibly reset,
5177  * equivalent to what remove_from_waiters() would normally do:
5178  * . lkb removed from ls_waiters list
5179  * . lkb wait_type cleared
5180  * . lkb waiters_count cleared
5181  * . lkb ref count decremented for each waiters_count (almost always 1,
5182  *   but possibly 2 in case of cancel/unlock overlapping, which means
5183  *   two remote replies were being expected for the lkb.)
5184  *
5185  * Second, the lkb is reprocessed like an original operation would be,
5186  * by passing it to _request_lock or _convert_lock, which will either
5187  * process the lkb operation locally, or send it to a remote node again
5188  * and put the lkb back onto the waiters list.
5189  *
5190  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5191  * force-unlock or cancel, either from before recovery began, or after recovery
5192  * finished.  If this is the case, the unlock/cancel is done directly, and the
5193  * original operation is not initiated again (no _request_lock/_convert_lock.)
5194  */
5195 
5196 int dlm_recover_waiters_post(struct dlm_ls *ls)
5197 {
5198 	struct dlm_lkb *lkb;
5199 	struct dlm_rsb *r;
5200 	int error = 0, mstype, err, oc, ou;
5201 
5202 	while (1) {
5203 		if (dlm_locking_stopped(ls)) {
5204 			log_debug(ls, "recover_waiters_post aborted");
5205 			error = -EINTR;
5206 			break;
5207 		}
5208 
5209 		/*
5210 		 * Find an lkb from the waiters list that's been affected by
5211 		 * recovery node changes, and needs to be reprocessed.  Does
5212 		 * hold_lkb(), adding a refcount.
5213 		 */
5214 		lkb = find_resend_waiter(ls);
5215 		if (!lkb)
5216 			break;
5217 
5218 		r = lkb->lkb_resource;
5219 		hold_rsb(r);
5220 		lock_rsb(r);
5221 
5222 		/*
5223 		 * If the lkb has been flagged for a force unlock or cancel,
5224 		 * then the reprocessing below will be replaced by just doing
5225 		 * the unlock/cancel directly.
5226 		 */
5227 		mstype = lkb->lkb_wait_type;
5228 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5229 					&lkb->lkb_iflags);
5230 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5231 					&lkb->lkb_iflags);
5232 		err = 0;
5233 
5234 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5235 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5236 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5237 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5238 			  dlm_dir_nodeid(r), oc, ou);
5239 
5240 		/*
5241 		 * No reply to the pre-recovery operation will now be received,
5242 		 * so a forced equivalent of remove_from_waiters() is needed to
5243 		 * reset the waiters state that was in place before recovery.
5244 		 */
5245 
5246 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5247 
5248 		/* Forcibly clear wait_type */
5249 		lkb->lkb_wait_type = 0;
5250 
5251 		/*
5252 		 * Forcibly reset wait_count and associated refcount.  The
5253 		 * wait_count will almost always be 1, but in case of an
5254 		 * overlapping unlock/cancel it could be 2: see where
5255 		 * add_to_waiters() finds the lkb is already on the waiters
5256 		 * list and does lkb_wait_count++; hold_lkb().
5257 		 */
5258 		while (lkb->lkb_wait_count) {
5259 			lkb->lkb_wait_count--;
5260 			unhold_lkb(lkb);
5261 		}
5262 
5263 		/* Forcibly remove from waiters list */
5264 		spin_lock_bh(&ls->ls_waiters_lock);
5265 		list_del_init(&lkb->lkb_wait_reply);
5266 		spin_unlock_bh(&ls->ls_waiters_lock);
5267 
5268 		/*
5269 		 * The lkb is now clear of all prior waiters state and can be
5270 		 * processed locally, or sent to remote node again, or directly
5271 		 * cancelled/unlocked.
5272 		 */
5273 
5274 		if (oc || ou) {
5275 			/* do an unlock or cancel instead of resending */
5276 			switch (mstype) {
5277 			case DLM_MSG_LOOKUP:
5278 			case DLM_MSG_REQUEST:
5279 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5280 							-DLM_ECANCEL);
5281 				unhold_lkb(lkb); /* undoes create_lkb() */
5282 				break;
5283 			case DLM_MSG_CONVERT:
5284 				if (oc) {
5285 					queue_cast(r, lkb, -DLM_ECANCEL);
5286 				} else {
5287 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5288 					_unlock_lock(r, lkb);
5289 				}
5290 				break;
5291 			default:
5292 				err = 1;
5293 			}
5294 		} else {
5295 			switch (mstype) {
5296 			case DLM_MSG_LOOKUP:
5297 			case DLM_MSG_REQUEST:
5298 				_request_lock(r, lkb);
5299 				if (r->res_nodeid != -1 && is_master(r))
5300 					confirm_master(r, 0);
5301 				break;
5302 			case DLM_MSG_CONVERT:
5303 				_convert_lock(r, lkb);
5304 				break;
5305 			default:
5306 				err = 1;
5307 			}
5308 		}
5309 
5310 		if (err) {
5311 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5312 				  "dir_nodeid %d overlap %d %d",
5313 				  lkb->lkb_id, mstype, r->res_nodeid,
5314 				  dlm_dir_nodeid(r), oc, ou);
5315 		}
5316 		unlock_rsb(r);
5317 		put_rsb(r);
5318 		dlm_put_lkb(lkb);
5319 	}
5320 
5321 	return error;
5322 }
5323 
5324 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5325 			      struct list_head *list)
5326 {
5327 	struct dlm_lkb *lkb, *safe;
5328 
5329 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5330 		if (!is_master_copy(lkb))
5331 			continue;
5332 
5333 		/* don't purge lkbs we've added in recover_master_copy for
5334 		   the current recovery seq */
5335 
5336 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5337 			continue;
5338 
5339 		del_lkb(r, lkb);
5340 
5341 		/* this put should free the lkb */
5342 		if (!dlm_put_lkb(lkb))
5343 			log_error(ls, "purged mstcpy lkb not released");
5344 	}
5345 }
5346 
5347 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5348 {
5349 	struct dlm_ls *ls = r->res_ls;
5350 
5351 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5352 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5353 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5354 }
5355 
5356 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5357 			    struct list_head *list,
5358 			    int nodeid_gone, unsigned int *count)
5359 {
5360 	struct dlm_lkb *lkb, *safe;
5361 
5362 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5363 		if (!is_master_copy(lkb))
5364 			continue;
5365 
5366 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5367 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5368 
5369 			/* tell recover_lvb to invalidate the lvb
5370 			   because a node holding EX/PW failed */
5371 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5372 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5373 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5374 			}
5375 
5376 			del_lkb(r, lkb);
5377 
5378 			/* this put should free the lkb */
5379 			if (!dlm_put_lkb(lkb))
5380 				log_error(ls, "purged dead lkb not released");
5381 
5382 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5383 
5384 			(*count)++;
5385 		}
5386 	}
5387 }
5388 
5389 /* Get rid of locks held by nodes that are gone. */
5390 
5391 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5392 {
5393 	struct dlm_rsb *r;
5394 	struct dlm_member *memb;
5395 	int nodes_count = 0;
5396 	int nodeid_gone = 0;
5397 	unsigned int lkb_count = 0;
5398 
5399 	/* cache one removed nodeid to optimize the common
5400 	   case of a single node removed */
5401 
5402 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5403 		nodes_count++;
5404 		nodeid_gone = memb->nodeid;
5405 	}
5406 
5407 	if (!nodes_count)
5408 		return;
5409 
5410 	list_for_each_entry(r, root_list, res_root_list) {
5411 		lock_rsb(r);
5412 		if (r->res_nodeid != -1 && is_master(r)) {
5413 			purge_dead_list(ls, r, &r->res_grantqueue,
5414 					nodeid_gone, &lkb_count);
5415 			purge_dead_list(ls, r, &r->res_convertqueue,
5416 					nodeid_gone, &lkb_count);
5417 			purge_dead_list(ls, r, &r->res_waitqueue,
5418 					nodeid_gone, &lkb_count);
5419 		}
5420 		unlock_rsb(r);
5421 
5422 		cond_resched();
5423 	}
5424 
5425 	if (lkb_count)
5426 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5427 			  lkb_count, nodes_count);
5428 }
5429 
5430 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5431 {
5432 	struct dlm_rsb *r;
5433 
5434 	read_lock_bh(&ls->ls_rsbtbl_lock);
5435 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5436 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5437 			continue;
5438 		if (!is_master(r)) {
5439 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5440 			continue;
5441 		}
5442 		hold_rsb(r);
5443 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5444 		return r;
5445 	}
5446 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5447 	return NULL;
5448 }
5449 
5450 /*
5451  * Attempt to grant locks on resources that we are the master of.
5452  * Locks may have become grantable during recovery because locks
5453  * from departed nodes have been purged (or not rebuilt), allowing
5454  * previously blocked locks to now be granted.  The subset of rsb's
5455  * we are interested in are those with lkb's on either the convert or
5456  * waiting queues.
5457  *
5458  * Simplest would be to go through each master rsb and check for non-empty
5459  * convert or waiting queues, and attempt to grant on those rsbs.
5460  * Checking the queues requires lock_rsb, though, for which we'd need
5461  * to release the rsbtbl lock.  This would make iterating through all
5462  * rsb's very inefficient.  So, we rely on earlier recovery routines
5463  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5464  * locks for.
5465  */
5466 
5467 void dlm_recover_grant(struct dlm_ls *ls)
5468 {
5469 	struct dlm_rsb *r;
5470 	unsigned int count = 0;
5471 	unsigned int rsb_count = 0;
5472 	unsigned int lkb_count = 0;
5473 
5474 	while (1) {
5475 		r = find_grant_rsb(ls);
5476 		if (!r)
5477 			break;
5478 
5479 		rsb_count++;
5480 		count = 0;
5481 		lock_rsb(r);
5482 		/* the RECOVER_GRANT flag is checked in the grant path */
5483 		grant_pending_locks(r, &count);
5484 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5485 		lkb_count += count;
5486 		confirm_master(r, 0);
5487 		unlock_rsb(r);
5488 		put_rsb(r);
5489 		cond_resched();
5490 	}
5491 
5492 	if (lkb_count)
5493 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5494 			  lkb_count, rsb_count);
5495 }
5496 
5497 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5498 					 uint32_t remid)
5499 {
5500 	struct dlm_lkb *lkb;
5501 
5502 	list_for_each_entry(lkb, head, lkb_statequeue) {
5503 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5504 			return lkb;
5505 	}
5506 	return NULL;
5507 }
5508 
5509 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5510 				    uint32_t remid)
5511 {
5512 	struct dlm_lkb *lkb;
5513 
5514 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5515 	if (lkb)
5516 		return lkb;
5517 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5518 	if (lkb)
5519 		return lkb;
5520 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5521 	if (lkb)
5522 		return lkb;
5523 	return NULL;
5524 }
5525 
5526 /* needs at least dlm_rcom + rcom_lock */
5527 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5528 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5529 {
5530 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5531 
5532 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5533 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5534 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5535 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5536 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5537 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5538 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5539 	lkb->lkb_rqmode = rl->rl_rqmode;
5540 	lkb->lkb_grmode = rl->rl_grmode;
5541 	/* don't set lkb_status because add_lkb wants to itself */
5542 
5543 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5544 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5545 
5546 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5547 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5548 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5549 		if (lvblen > ls->ls_lvblen)
5550 			return -EINVAL;
5551 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5552 		if (!lkb->lkb_lvbptr)
5553 			return -ENOMEM;
5554 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5555 	}
5556 
5557 	/* Conversions between PR and CW (middle modes) need special handling.
5558 	   The real granted mode of these converting locks cannot be determined
5559 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5560 
5561 	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5562 		/* We may need to adjust grmode depending on other granted locks. */
5563 		log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5564 			  __func__, lkb->lkb_id, lkb->lkb_grmode,
5565 			  lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5566 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5567 	}
5568 
5569 	return 0;
5570 }
5571 
5572 /* This lkb may have been recovered in a previous aborted recovery so we need
5573    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5574    If so we just send back a standard reply.  If not, we create a new lkb with
5575    the given values and send back our lkid.  We send back our lkid by sending
5576    back the rcom_lock struct we got but with the remid field filled in. */
5577 
5578 /* needs at least dlm_rcom + rcom_lock */
5579 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5580 			    __le32 *rl_remid, __le32 *rl_result)
5581 {
5582 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5583 	struct dlm_rsb *r;
5584 	struct dlm_lkb *lkb;
5585 	uint32_t remid = 0;
5586 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5587 	int error;
5588 
5589 	/* init rl_remid with rcom lock rl_remid */
5590 	*rl_remid = rl->rl_remid;
5591 
5592 	if (rl->rl_parent_lkid) {
5593 		error = -EOPNOTSUPP;
5594 		goto out;
5595 	}
5596 
5597 	remid = le32_to_cpu(rl->rl_lkid);
5598 
5599 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5600 	   have to require it.  Recovery of masters on one node can overlap
5601 	   recovery of locks on another node, so one node can send us MSTCPY
5602 	   locks before we've made ourselves master of this rsb.  We can still
5603 	   add new MSTCPY locks that we receive here without any harm; when
5604 	   we make ourselves master, dlm_recover_masters() won't touch the
5605 	   MSTCPY locks we've received early. */
5606 
5607 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5608 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5609 	if (error)
5610 		goto out;
5611 
5612 	lock_rsb(r);
5613 
5614 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5615 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5616 			  from_nodeid, remid);
5617 		error = -EBADR;
5618 		goto out_unlock;
5619 	}
5620 
5621 	lkb = search_remid(r, from_nodeid, remid);
5622 	if (lkb) {
5623 		error = -EEXIST;
5624 		goto out_remid;
5625 	}
5626 
5627 	error = create_lkb(ls, &lkb);
5628 	if (error)
5629 		goto out_unlock;
5630 
5631 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5632 	if (error) {
5633 		__put_lkb(ls, lkb);
5634 		goto out_unlock;
5635 	}
5636 
5637 	attach_lkb(r, lkb);
5638 	add_lkb(r, lkb, rl->rl_status);
5639 	ls->ls_recover_locks_in++;
5640 
5641 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5642 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5643 
5644  out_remid:
5645 	/* this is the new value returned to the lock holder for
5646 	   saving in its process-copy lkb */
5647 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5648 
5649 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5650 
5651  out_unlock:
5652 	unlock_rsb(r);
5653 	put_rsb(r);
5654  out:
5655 	if (error && error != -EEXIST)
5656 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5657 			  from_nodeid, remid, error);
5658 	*rl_result = cpu_to_le32(error);
5659 	return error;
5660 }
5661 
5662 /* needs at least dlm_rcom + rcom_lock */
5663 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5664 			     uint64_t seq)
5665 {
5666 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5667 	struct dlm_rsb *r;
5668 	struct dlm_lkb *lkb;
5669 	uint32_t lkid, remid;
5670 	int error, result;
5671 
5672 	lkid = le32_to_cpu(rl->rl_lkid);
5673 	remid = le32_to_cpu(rl->rl_remid);
5674 	result = le32_to_cpu(rl->rl_result);
5675 
5676 	error = find_lkb(ls, lkid, &lkb);
5677 	if (error) {
5678 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5679 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5680 			  result);
5681 		return error;
5682 	}
5683 
5684 	r = lkb->lkb_resource;
5685 	hold_rsb(r);
5686 	lock_rsb(r);
5687 
5688 	if (!is_process_copy(lkb)) {
5689 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5690 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691 			  result);
5692 		dlm_dump_rsb(r);
5693 		unlock_rsb(r);
5694 		put_rsb(r);
5695 		dlm_put_lkb(lkb);
5696 		return -EINVAL;
5697 	}
5698 
5699 	switch (result) {
5700 	case -EBADR:
5701 		/* There's a chance the new master received our lock before
5702 		   dlm_recover_master_reply(), this wouldn't happen if we did
5703 		   a barrier between recover_masters and recover_locks. */
5704 
5705 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5706 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 			  result);
5708 
5709 		dlm_send_rcom_lock(r, lkb, seq);
5710 		goto out;
5711 	case -EEXIST:
5712 	case 0:
5713 		lkb->lkb_remid = remid;
5714 		break;
5715 	default:
5716 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5717 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 			  result);
5719 	}
5720 
5721 	/* an ack for dlm_recover_locks() which waits for replies from
5722 	   all the locks it sends to new masters */
5723 	dlm_recovered_lock(r);
5724  out:
5725 	unlock_rsb(r);
5726 	put_rsb(r);
5727 	dlm_put_lkb(lkb);
5728 
5729 	return 0;
5730 }
5731 
5732 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5733 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5734 {
5735 	struct dlm_lkb *lkb;
5736 	struct dlm_args args;
5737 	bool do_put = true;
5738 	int error;
5739 
5740 	dlm_lock_recovery(ls);
5741 
5742 	error = create_lkb(ls, &lkb);
5743 	if (error) {
5744 		kfree(ua);
5745 		goto out;
5746 	}
5747 
5748 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5749 
5750 	if (flags & DLM_LKF_VALBLK) {
5751 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5752 		if (!ua->lksb.sb_lvbptr) {
5753 			kfree(ua);
5754 			error = -ENOMEM;
5755 			goto out_put;
5756 		}
5757 	}
5758 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5759 			      fake_bastfn, &args);
5760 	if (error) {
5761 		kfree(ua->lksb.sb_lvbptr);
5762 		ua->lksb.sb_lvbptr = NULL;
5763 		kfree(ua);
5764 		goto out_put;
5765 	}
5766 
5767 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5768 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5769 	   lock and that lkb_astparam is the dlm_user_args structure. */
5770 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5771 	error = request_lock(ls, lkb, name, namelen, &args);
5772 
5773 	switch (error) {
5774 	case 0:
5775 		break;
5776 	case -EINPROGRESS:
5777 		error = 0;
5778 		break;
5779 	case -EAGAIN:
5780 		error = 0;
5781 		fallthrough;
5782 	default:
5783 		goto out_put;
5784 	}
5785 
5786 	/* add this new lkb to the per-process list of locks */
5787 	spin_lock_bh(&ua->proc->locks_spin);
5788 	hold_lkb(lkb);
5789 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5790 	spin_unlock_bh(&ua->proc->locks_spin);
5791 	do_put = false;
5792  out_put:
5793 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5794 	if (do_put)
5795 		__put_lkb(ls, lkb);
5796  out:
5797 	dlm_unlock_recovery(ls);
5798 	return error;
5799 }
5800 
5801 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5802 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5803 {
5804 	struct dlm_lkb *lkb;
5805 	struct dlm_args args;
5806 	struct dlm_user_args *ua;
5807 	int error;
5808 
5809 	dlm_lock_recovery(ls);
5810 
5811 	error = find_lkb(ls, lkid, &lkb);
5812 	if (error)
5813 		goto out;
5814 
5815 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5816 
5817 	/* user can change the params on its lock when it converts it, or
5818 	   add an lvb that didn't exist before */
5819 
5820 	ua = lkb->lkb_ua;
5821 
5822 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5823 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5824 		if (!ua->lksb.sb_lvbptr) {
5825 			error = -ENOMEM;
5826 			goto out_put;
5827 		}
5828 	}
5829 	if (lvb_in && ua->lksb.sb_lvbptr)
5830 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5831 
5832 	ua->xid = ua_tmp->xid;
5833 	ua->castparam = ua_tmp->castparam;
5834 	ua->castaddr = ua_tmp->castaddr;
5835 	ua->bastparam = ua_tmp->bastparam;
5836 	ua->bastaddr = ua_tmp->bastaddr;
5837 	ua->user_lksb = ua_tmp->user_lksb;
5838 
5839 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5840 			      fake_bastfn, &args);
5841 	if (error)
5842 		goto out_put;
5843 
5844 	error = convert_lock(ls, lkb, &args);
5845 
5846 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5847 		error = 0;
5848  out_put:
5849 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5850 	dlm_put_lkb(lkb);
5851  out:
5852 	dlm_unlock_recovery(ls);
5853 	kfree(ua_tmp);
5854 	return error;
5855 }
5856 
5857 /*
5858  * The caller asks for an orphan lock on a given resource with a given mode.
5859  * If a matching lock exists, it's moved to the owner's list of locks and
5860  * the lkid is returned.
5861  */
5862 
5863 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5864 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5865 		     uint32_t *lkid)
5866 {
5867 	struct dlm_lkb *lkb = NULL, *iter;
5868 	struct dlm_user_args *ua;
5869 	int found_other_mode = 0;
5870 	int rv = 0;
5871 
5872 	spin_lock_bh(&ls->ls_orphans_lock);
5873 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5874 		if (iter->lkb_resource->res_length != namelen)
5875 			continue;
5876 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5877 			continue;
5878 		if (iter->lkb_grmode != mode) {
5879 			found_other_mode = 1;
5880 			continue;
5881 		}
5882 
5883 		lkb = iter;
5884 		list_del_init(&iter->lkb_ownqueue);
5885 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5886 		*lkid = iter->lkb_id;
5887 		break;
5888 	}
5889 	spin_unlock_bh(&ls->ls_orphans_lock);
5890 
5891 	if (!lkb && found_other_mode) {
5892 		rv = -EAGAIN;
5893 		goto out;
5894 	}
5895 
5896 	if (!lkb) {
5897 		rv = -ENOENT;
5898 		goto out;
5899 	}
5900 
5901 	lkb->lkb_exflags = flags;
5902 	lkb->lkb_ownpid = (int) current->pid;
5903 
5904 	ua = lkb->lkb_ua;
5905 
5906 	ua->proc = ua_tmp->proc;
5907 	ua->xid = ua_tmp->xid;
5908 	ua->castparam = ua_tmp->castparam;
5909 	ua->castaddr = ua_tmp->castaddr;
5910 	ua->bastparam = ua_tmp->bastparam;
5911 	ua->bastaddr = ua_tmp->bastaddr;
5912 	ua->user_lksb = ua_tmp->user_lksb;
5913 
5914 	/*
5915 	 * The lkb reference from the ls_orphans list was not
5916 	 * removed above, and is now considered the reference
5917 	 * for the proc locks list.
5918 	 */
5919 
5920 	spin_lock_bh(&ua->proc->locks_spin);
5921 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5922 	spin_unlock_bh(&ua->proc->locks_spin);
5923  out:
5924 	kfree(ua_tmp);
5925 	return rv;
5926 }
5927 
5928 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5930 {
5931 	struct dlm_lkb *lkb;
5932 	struct dlm_args args;
5933 	struct dlm_user_args *ua;
5934 	int error;
5935 
5936 	dlm_lock_recovery(ls);
5937 
5938 	error = find_lkb(ls, lkid, &lkb);
5939 	if (error)
5940 		goto out;
5941 
5942 	trace_dlm_unlock_start(ls, lkb, flags);
5943 
5944 	ua = lkb->lkb_ua;
5945 
5946 	if (lvb_in && ua->lksb.sb_lvbptr)
5947 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5948 	if (ua_tmp->castparam)
5949 		ua->castparam = ua_tmp->castparam;
5950 	ua->user_lksb = ua_tmp->user_lksb;
5951 
5952 	error = set_unlock_args(flags, ua, &args);
5953 	if (error)
5954 		goto out_put;
5955 
5956 	error = unlock_lock(ls, lkb, &args);
5957 
5958 	if (error == -DLM_EUNLOCK)
5959 		error = 0;
5960 	/* from validate_unlock_args() */
5961 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5962 		error = 0;
5963 	if (error)
5964 		goto out_put;
5965 
5966 	spin_lock_bh(&ua->proc->locks_spin);
5967 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5968 	if (!list_empty(&lkb->lkb_ownqueue))
5969 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5970 	spin_unlock_bh(&ua->proc->locks_spin);
5971  out_put:
5972 	trace_dlm_unlock_end(ls, lkb, flags, error);
5973 	dlm_put_lkb(lkb);
5974  out:
5975 	dlm_unlock_recovery(ls);
5976 	kfree(ua_tmp);
5977 	return error;
5978 }
5979 
5980 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5981 		    uint32_t flags, uint32_t lkid)
5982 {
5983 	struct dlm_lkb *lkb;
5984 	struct dlm_args args;
5985 	struct dlm_user_args *ua;
5986 	int error;
5987 
5988 	dlm_lock_recovery(ls);
5989 
5990 	error = find_lkb(ls, lkid, &lkb);
5991 	if (error)
5992 		goto out;
5993 
5994 	trace_dlm_unlock_start(ls, lkb, flags);
5995 
5996 	ua = lkb->lkb_ua;
5997 	if (ua_tmp->castparam)
5998 		ua->castparam = ua_tmp->castparam;
5999 	ua->user_lksb = ua_tmp->user_lksb;
6000 
6001 	error = set_unlock_args(flags, ua, &args);
6002 	if (error)
6003 		goto out_put;
6004 
6005 	error = cancel_lock(ls, lkb, &args);
6006 
6007 	if (error == -DLM_ECANCEL)
6008 		error = 0;
6009 	/* from validate_unlock_args() */
6010 	if (error == -EBUSY)
6011 		error = 0;
6012  out_put:
6013 	trace_dlm_unlock_end(ls, lkb, flags, error);
6014 	dlm_put_lkb(lkb);
6015  out:
6016 	dlm_unlock_recovery(ls);
6017 	kfree(ua_tmp);
6018 	return error;
6019 }
6020 
6021 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6022 {
6023 	struct dlm_lkb *lkb;
6024 	struct dlm_args args;
6025 	struct dlm_user_args *ua;
6026 	struct dlm_rsb *r;
6027 	int error;
6028 
6029 	dlm_lock_recovery(ls);
6030 
6031 	error = find_lkb(ls, lkid, &lkb);
6032 	if (error)
6033 		goto out;
6034 
6035 	trace_dlm_unlock_start(ls, lkb, flags);
6036 
6037 	ua = lkb->lkb_ua;
6038 
6039 	error = set_unlock_args(flags, ua, &args);
6040 	if (error)
6041 		goto out_put;
6042 
6043 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6044 
6045 	r = lkb->lkb_resource;
6046 	hold_rsb(r);
6047 	lock_rsb(r);
6048 
6049 	error = validate_unlock_args(lkb, &args);
6050 	if (error)
6051 		goto out_r;
6052 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6053 
6054 	error = _cancel_lock(r, lkb);
6055  out_r:
6056 	unlock_rsb(r);
6057 	put_rsb(r);
6058 
6059 	if (error == -DLM_ECANCEL)
6060 		error = 0;
6061 	/* from validate_unlock_args() */
6062 	if (error == -EBUSY)
6063 		error = 0;
6064  out_put:
6065 	trace_dlm_unlock_end(ls, lkb, flags, error);
6066 	dlm_put_lkb(lkb);
6067  out:
6068 	dlm_unlock_recovery(ls);
6069 	return error;
6070 }
6071 
6072 /* lkb's that are removed from the waiters list by revert are just left on the
6073    orphans list with the granted orphan locks, to be freed by purge */
6074 
6075 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6076 {
6077 	struct dlm_args args;
6078 	int error;
6079 
6080 	hold_lkb(lkb); /* reference for the ls_orphans list */
6081 	spin_lock_bh(&ls->ls_orphans_lock);
6082 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6083 	spin_unlock_bh(&ls->ls_orphans_lock);
6084 
6085 	set_unlock_args(0, lkb->lkb_ua, &args);
6086 
6087 	error = cancel_lock(ls, lkb, &args);
6088 	if (error == -DLM_ECANCEL)
6089 		error = 0;
6090 	return error;
6091 }
6092 
6093 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6094    granted.  Regardless of what rsb queue the lock is on, it's removed and
6095    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6096    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6097 
6098 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6099 {
6100 	struct dlm_args args;
6101 	int error;
6102 
6103 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6104 			lkb->lkb_ua, &args);
6105 
6106 	error = unlock_lock(ls, lkb, &args);
6107 	if (error == -DLM_EUNLOCK)
6108 		error = 0;
6109 	return error;
6110 }
6111 
6112 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6113    (which does lock_rsb) due to deadlock with receiving a message that does
6114    lock_rsb followed by dlm_user_add_cb() */
6115 
6116 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6117 				     struct dlm_user_proc *proc)
6118 {
6119 	struct dlm_lkb *lkb = NULL;
6120 
6121 	spin_lock_bh(&ls->ls_clear_proc_locks);
6122 	if (list_empty(&proc->locks))
6123 		goto out;
6124 
6125 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6126 	list_del_init(&lkb->lkb_ownqueue);
6127 
6128 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6129 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6130 	else
6131 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6132  out:
6133 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6134 	return lkb;
6135 }
6136 
6137 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6138    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6139    which we clear here. */
6140 
6141 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6142    list, and no more device_writes should add lkb's to proc->locks list; so we
6143    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6144    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6145    them ourself. */
6146 
6147 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6148 {
6149 	struct dlm_callback *cb, *cb_safe;
6150 	struct dlm_lkb *lkb, *safe;
6151 
6152 	dlm_lock_recovery(ls);
6153 
6154 	while (1) {
6155 		lkb = del_proc_lock(ls, proc);
6156 		if (!lkb)
6157 			break;
6158 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6159 			orphan_proc_lock(ls, lkb);
6160 		else
6161 			unlock_proc_lock(ls, lkb);
6162 
6163 		/* this removes the reference for the proc->locks list
6164 		   added by dlm_user_request, it may result in the lkb
6165 		   being freed */
6166 
6167 		dlm_put_lkb(lkb);
6168 	}
6169 
6170 	spin_lock_bh(&ls->ls_clear_proc_locks);
6171 
6172 	/* in-progress unlocks */
6173 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6174 		list_del_init(&lkb->lkb_ownqueue);
6175 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6176 		dlm_put_lkb(lkb);
6177 	}
6178 
6179 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6180 		list_del(&cb->list);
6181 		dlm_free_cb(cb);
6182 	}
6183 
6184 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6185 	dlm_unlock_recovery(ls);
6186 }
6187 
6188 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6189 {
6190 	struct dlm_callback *cb, *cb_safe;
6191 	struct dlm_lkb *lkb, *safe;
6192 
6193 	while (1) {
6194 		lkb = NULL;
6195 		spin_lock_bh(&proc->locks_spin);
6196 		if (!list_empty(&proc->locks)) {
6197 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6198 					 lkb_ownqueue);
6199 			list_del_init(&lkb->lkb_ownqueue);
6200 		}
6201 		spin_unlock_bh(&proc->locks_spin);
6202 
6203 		if (!lkb)
6204 			break;
6205 
6206 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6207 		unlock_proc_lock(ls, lkb);
6208 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6209 	}
6210 
6211 	spin_lock_bh(&proc->locks_spin);
6212 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213 		list_del_init(&lkb->lkb_ownqueue);
6214 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6215 		dlm_put_lkb(lkb);
6216 	}
6217 	spin_unlock_bh(&proc->locks_spin);
6218 
6219 	spin_lock_bh(&proc->asts_spin);
6220 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6221 		list_del(&cb->list);
6222 		dlm_free_cb(cb);
6223 	}
6224 	spin_unlock_bh(&proc->asts_spin);
6225 }
6226 
6227 /* pid of 0 means purge all orphans */
6228 
6229 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6230 {
6231 	struct dlm_lkb *lkb, *safe;
6232 
6233 	spin_lock_bh(&ls->ls_orphans_lock);
6234 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6235 		if (pid && lkb->lkb_ownpid != pid)
6236 			continue;
6237 		unlock_proc_lock(ls, lkb);
6238 		list_del_init(&lkb->lkb_ownqueue);
6239 		dlm_put_lkb(lkb);
6240 	}
6241 	spin_unlock_bh(&ls->ls_orphans_lock);
6242 }
6243 
6244 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6245 {
6246 	struct dlm_message *ms;
6247 	struct dlm_mhandle *mh;
6248 	int error;
6249 
6250 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6251 				DLM_MSG_PURGE, &ms, &mh);
6252 	if (error)
6253 		return error;
6254 	ms->m_nodeid = cpu_to_le32(nodeid);
6255 	ms->m_pid = cpu_to_le32(pid);
6256 
6257 	return send_message(mh, ms, NULL, 0);
6258 }
6259 
6260 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6261 		   int nodeid, int pid)
6262 {
6263 	int error = 0;
6264 
6265 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6266 		error = send_purge(ls, nodeid, pid);
6267 	} else {
6268 		dlm_lock_recovery(ls);
6269 		if (pid == current->pid)
6270 			purge_proc_locks(ls, proc);
6271 		else
6272 			do_purge(ls, nodeid, pid);
6273 		dlm_unlock_recovery(ls);
6274 	}
6275 	return error;
6276 }
6277 
6278 /* debug functionality */
6279 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6280 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6281 {
6282 	struct dlm_lksb *lksb;
6283 	struct dlm_lkb *lkb;
6284 	struct dlm_rsb *r;
6285 	int error;
6286 
6287 	/* we currently can't set a valid user lock */
6288 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6289 		return -EOPNOTSUPP;
6290 
6291 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6292 	if (!lksb)
6293 		return -ENOMEM;
6294 
6295 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6296 	if (error) {
6297 		kfree(lksb);
6298 		return error;
6299 	}
6300 
6301 	dlm_set_dflags_val(lkb, lkb_dflags);
6302 	lkb->lkb_nodeid = lkb_nodeid;
6303 	lkb->lkb_lksb = lksb;
6304 	/* user specific pointer, just don't have it NULL for kernel locks */
6305 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6306 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6307 
6308 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6309 	if (error) {
6310 		kfree(lksb);
6311 		__put_lkb(ls, lkb);
6312 		return error;
6313 	}
6314 
6315 	lock_rsb(r);
6316 	attach_lkb(r, lkb);
6317 	add_lkb(r, lkb, lkb_status);
6318 	unlock_rsb(r);
6319 	put_rsb(r);
6320 
6321 	return 0;
6322 }
6323 
6324 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6325 				 int mstype, int to_nodeid)
6326 {
6327 	struct dlm_lkb *lkb;
6328 	int error;
6329 
6330 	error = find_lkb(ls, lkb_id, &lkb);
6331 	if (error)
6332 		return error;
6333 
6334 	add_to_waiters(lkb, mstype, to_nodeid);
6335 	dlm_put_lkb(lkb);
6336 	return 0;
6337 }
6338 
6339