xref: /linux/fs/dlm/lock.c (revision 9a95c5bfbf02a0a7f5983280fe284a0ff0836c34)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void toss_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* rsbs in toss state never get referenced */
334 	WARN_ON(rsb_flag(r, RSB_TOSS));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 /* When all references to the rsb are gone it's transferred to
374    the tossed list for later disposal. */
375 
376 static void put_rsb(struct dlm_rsb *r)
377 {
378 	struct dlm_ls *ls = r->res_ls;
379 	int rv;
380 
381 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, toss_rsb,
382 					&ls->ls_rsbtbl_lock);
383 	if (rv)
384 		write_unlock_bh(&ls->ls_rsbtbl_lock);
385 }
386 
387 void dlm_put_rsb(struct dlm_rsb *r)
388 {
389 	put_rsb(r);
390 }
391 
392 static int pre_rsb_struct(struct dlm_ls *ls)
393 {
394 	struct dlm_rsb *r1, *r2;
395 	int count = 0;
396 
397 	spin_lock_bh(&ls->ls_new_rsb_spin);
398 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
399 		spin_unlock_bh(&ls->ls_new_rsb_spin);
400 		return 0;
401 	}
402 	spin_unlock_bh(&ls->ls_new_rsb_spin);
403 
404 	r1 = dlm_allocate_rsb(ls);
405 	r2 = dlm_allocate_rsb(ls);
406 
407 	spin_lock_bh(&ls->ls_new_rsb_spin);
408 	if (r1) {
409 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
410 		ls->ls_new_rsb_count++;
411 	}
412 	if (r2) {
413 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
414 		ls->ls_new_rsb_count++;
415 	}
416 	count = ls->ls_new_rsb_count;
417 	spin_unlock_bh(&ls->ls_new_rsb_spin);
418 
419 	if (!count)
420 		return -ENOMEM;
421 	return 0;
422 }
423 
424 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
425  * new timers when recovery is triggered and don't run them
426  * again until a dlm_timer_resume() tries it again.
427  */
428 static void __rsb_mod_timer(struct dlm_ls *ls, unsigned long jiffies)
429 {
430 	if (!dlm_locking_stopped(ls))
431 		mod_timer(&ls->ls_timer, jiffies);
432 }
433 
434 /* This function tries to resume the timer callback if a rsb
435  * is on the toss list and no timer is pending. It might that
436  * the first entry is on currently executed as timer callback
437  * but we don't care if a timer queued up again and does
438  * nothing. Should be a rare case.
439  */
440 void dlm_timer_resume(struct dlm_ls *ls)
441 {
442 	struct dlm_rsb *r;
443 
444 	spin_lock_bh(&ls->ls_toss_q_lock);
445 	r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
446 				     res_toss_q_list);
447 	if (r && !timer_pending(&ls->ls_timer))
448 		__rsb_mod_timer(ls, r->res_toss_time);
449 	spin_unlock_bh(&ls->ls_toss_q_lock);
450 }
451 
452 /* ls_rsbtbl_lock must be held and being sure the rsb is in toss state */
453 static void rsb_delete_toss_timer(struct dlm_ls *ls, struct dlm_rsb *r)
454 {
455 	struct dlm_rsb *first;
456 
457 	spin_lock_bh(&ls->ls_toss_q_lock);
458 	r->res_toss_time = 0;
459 
460 	/* if the rsb is not queued do nothing */
461 	if (list_empty(&r->res_toss_q_list))
462 		goto out;
463 
464 	/* get the first element before delete */
465 	first = list_first_entry(&ls->ls_toss_q, struct dlm_rsb,
466 				 res_toss_q_list);
467 	list_del_init(&r->res_toss_q_list);
468 	/* check if the first element was the rsb we deleted */
469 	if (first == r) {
470 		/* try to get the new first element, if the list
471 		 * is empty now try to delete the timer, if we are
472 		 * too late we don't care.
473 		 *
474 		 * if the list isn't empty and a new first element got
475 		 * in place, set the new timer expire time.
476 		 */
477 		first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
478 						 res_toss_q_list);
479 		if (!first)
480 			timer_delete(&ls->ls_timer);
481 		else
482 			__rsb_mod_timer(ls, first->res_toss_time);
483 	}
484 
485 out:
486 	spin_unlock_bh(&ls->ls_toss_q_lock);
487 }
488 
489 /* Caller must held ls_rsbtbl_lock and need to be called every time
490  * when either the rsb enters toss state or the toss state changes
491  * the dir/master nodeid.
492  */
493 static void rsb_mod_timer(struct dlm_ls *ls, struct dlm_rsb *r)
494 {
495 	int our_nodeid = dlm_our_nodeid();
496 	struct dlm_rsb *first;
497 
498 	/* If we're the directory record for this rsb, and
499 	 * we're not the master of it, then we need to wait
500 	 * for the master node to send us a dir remove for
501 	 * before removing the dir record.
502 	 */
503 	if (!dlm_no_directory(ls) &&
504 	    (r->res_master_nodeid != our_nodeid) &&
505 	    (dlm_dir_nodeid(r) == our_nodeid)) {
506 		rsb_delete_toss_timer(ls, r);
507 		return;
508 	}
509 
510 	spin_lock_bh(&ls->ls_toss_q_lock);
511 	/* set the new rsb absolute expire time in the rsb */
512 	r->res_toss_time = rsb_toss_jiffies();
513 	if (list_empty(&ls->ls_toss_q)) {
514 		/* if the queue is empty add the element and it's
515 		 * our new expire time
516 		 */
517 		list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
518 		__rsb_mod_timer(ls, r->res_toss_time);
519 	} else {
520 		/* check if the rsb was already queued, if so delete
521 		 * it from the toss queue
522 		 */
523 		if (!list_empty(&r->res_toss_q_list))
524 			list_del(&r->res_toss_q_list);
525 
526 		/* try to get the maybe new first element and then add
527 		 * to this rsb with the oldest expire time to the end
528 		 * of the queue. If the list was empty before this
529 		 * rsb expire time is our next expiration if it wasn't
530 		 * the now new first elemet is our new expiration time
531 		 */
532 		first = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
533 						 res_toss_q_list);
534 		list_add_tail(&r->res_toss_q_list, &ls->ls_toss_q);
535 		if (!first)
536 			__rsb_mod_timer(ls, r->res_toss_time);
537 		else
538 			__rsb_mod_timer(ls, first->res_toss_time);
539 	}
540 	spin_unlock_bh(&ls->ls_toss_q_lock);
541 }
542 
543 /* if we hit contention we do in 250 ms a retry to trylock.
544  * if there is any other mod_timer in between we don't care
545  * about that it expires earlier again this is only for the
546  * unlikely case nothing happened in this time.
547  */
548 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
549 
550 void dlm_rsb_toss_timer(struct timer_list *timer)
551 {
552 	struct dlm_ls *ls = from_timer(ls, timer, ls_timer);
553 	int our_nodeid = dlm_our_nodeid();
554 	struct dlm_rsb *r;
555 	int rv;
556 
557 	while (1) {
558 		/* interrupting point to leave iteration when
559 		 * recovery waits for timer_delete_sync(), recovery
560 		 * will take care to delete everything in toss queue.
561 		 */
562 		if (dlm_locking_stopped(ls))
563 			break;
564 
565 		rv = spin_trylock(&ls->ls_toss_q_lock);
566 		if (!rv) {
567 			/* rearm again try timer */
568 			__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
569 			break;
570 		}
571 
572 		r = list_first_entry_or_null(&ls->ls_toss_q, struct dlm_rsb,
573 					     res_toss_q_list);
574 		if (!r) {
575 			/* nothing to do anymore next rsb queue will
576 			 * set next mod_timer() expire.
577 			 */
578 			spin_unlock(&ls->ls_toss_q_lock);
579 			break;
580 		}
581 
582 		/* test if the first rsb isn't expired yet, if
583 		 * so we stop freeing rsb from toss queue as
584 		 * the order in queue is ascending to the
585 		 * absolute res_toss_time jiffies
586 		 */
587 		if (time_before(jiffies, r->res_toss_time)) {
588 			/* rearm with the next rsb to expire in the future */
589 			__rsb_mod_timer(ls, r->res_toss_time);
590 			spin_unlock(&ls->ls_toss_q_lock);
591 			break;
592 		}
593 
594 		/* in find_rsb_dir/nodir there is a reverse order of this
595 		 * lock, however this is only a trylock if we hit some
596 		 * possible contention we try it again.
597 		 *
598 		 * This lock synchronized while holding ls_toss_q_lock
599 		 * synchronize everything that rsb_delete_toss_timer()
600 		 * or rsb_mod_timer() can't run after this timer callback
601 		 * deletes the rsb from the ls_toss_q. Whereas the other
602 		 * holders have always a priority to run as this is only
603 		 * a caching handling and the other holders might to put
604 		 * this rsb out of the toss state.
605 		 */
606 		rv = write_trylock(&ls->ls_rsbtbl_lock);
607 		if (!rv) {
608 			spin_unlock(&ls->ls_toss_q_lock);
609 			/* rearm again try timer */
610 			__rsb_mod_timer(ls, DLM_TOSS_TIMER_RETRY);
611 			break;
612 		}
613 
614 		list_del(&r->res_rsbs_list);
615 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
616 				       dlm_rhash_rsb_params);
617 
618 		/* not necessary to held the ls_rsbtbl_lock when
619 		 * calling send_remove()
620 		 */
621 		write_unlock(&ls->ls_rsbtbl_lock);
622 
623 		/* remove the rsb out of the toss queue its gone
624 		 * drom DLM now
625 		 */
626 		list_del_init(&r->res_toss_q_list);
627 		spin_unlock(&ls->ls_toss_q_lock);
628 
629 		/* no rsb in this state should ever run a timer */
630 		WARN_ON(!dlm_no_directory(ls) &&
631 			(r->res_master_nodeid != our_nodeid) &&
632 			(dlm_dir_nodeid(r) == our_nodeid));
633 
634 		/* We're the master of this rsb but we're not
635 		 * the directory record, so we need to tell the
636 		 * dir node to remove the dir record
637 		 */
638 		if (!dlm_no_directory(ls) &&
639 		    (r->res_master_nodeid == our_nodeid) &&
640 		    (dlm_dir_nodeid(r) != our_nodeid))
641 			send_remove(r);
642 
643 		free_toss_rsb(r);
644 	}
645 }
646 
647 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
648    unlock any spinlocks, go back and call pre_rsb_struct again.
649    Otherwise, take an rsb off the list and return it. */
650 
651 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
652 			  struct dlm_rsb **r_ret)
653 {
654 	struct dlm_rsb *r;
655 	int count;
656 
657 	spin_lock_bh(&ls->ls_new_rsb_spin);
658 	if (list_empty(&ls->ls_new_rsb)) {
659 		count = ls->ls_new_rsb_count;
660 		spin_unlock_bh(&ls->ls_new_rsb_spin);
661 		log_debug(ls, "find_rsb retry %d %d %s",
662 			  count, dlm_config.ci_new_rsb_count,
663 			  (const char *)name);
664 		return -EAGAIN;
665 	}
666 
667 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
668 	list_del(&r->res_hashchain);
669 	ls->ls_new_rsb_count--;
670 	spin_unlock_bh(&ls->ls_new_rsb_spin);
671 
672 	r->res_ls = ls;
673 	r->res_length = len;
674 	memcpy(r->res_name, name, len);
675 	spin_lock_init(&r->res_lock);
676 
677 	INIT_LIST_HEAD(&r->res_lookup);
678 	INIT_LIST_HEAD(&r->res_grantqueue);
679 	INIT_LIST_HEAD(&r->res_convertqueue);
680 	INIT_LIST_HEAD(&r->res_waitqueue);
681 	INIT_LIST_HEAD(&r->res_root_list);
682 	INIT_LIST_HEAD(&r->res_toss_q_list);
683 	INIT_LIST_HEAD(&r->res_recover_list);
684 	INIT_LIST_HEAD(&r->res_masters_list);
685 
686 	*r_ret = r;
687 	return 0;
688 }
689 
690 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
691 			struct dlm_rsb **r_ret)
692 {
693 	char key[DLM_RESNAME_MAXLEN] = {};
694 
695 	memcpy(key, name, len);
696 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
697 	if (*r_ret)
698 		return 0;
699 
700 	return -EBADR;
701 }
702 
703 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
704 {
705 	return rhashtable_insert_fast(rhash, &rsb->res_node,
706 				      dlm_rhash_rsb_params);
707 }
708 
709 /*
710  * Find rsb in rsbtbl and potentially create/add one
711  *
712  * Delaying the release of rsb's has a similar benefit to applications keeping
713  * NL locks on an rsb, but without the guarantee that the cached master value
714  * will still be valid when the rsb is reused.  Apps aren't always smart enough
715  * to keep NL locks on an rsb that they may lock again shortly; this can lead
716  * to excessive master lookups and removals if we don't delay the release.
717  *
718  * Searching for an rsb means looking through both the normal list and toss
719  * list.  When found on the toss list the rsb is moved to the normal list with
720  * ref count of 1; when found on normal list the ref count is incremented.
721  *
722  * rsb's on the keep list are being used locally and refcounted.
723  * rsb's on the toss list are not being used locally, and are not refcounted.
724  *
725  * The toss list rsb's were either
726  * - previously used locally but not any more (were on keep list, then
727  *   moved to toss list when last refcount dropped)
728  * - created and put on toss list as a directory record for a lookup
729  *   (we are the dir node for the res, but are not using the res right now,
730  *   but some other node is)
731  *
732  * The purpose of find_rsb() is to return a refcounted rsb for local use.
733  * So, if the given rsb is on the toss list, it is moved to the keep list
734  * before being returned.
735  *
736  * toss_rsb() happens when all local usage of the rsb is done, i.e. no
737  * more refcounts exist, so the rsb is moved from the keep list to the
738  * toss list.
739  *
740  * rsb's on both keep and toss lists are used for doing a name to master
741  * lookups.  rsb's that are in use locally (and being refcounted) are on
742  * the keep list, rsb's that are not in use locally (not refcounted) and
743  * only exist for name/master lookups are on the toss list.
744  *
745  * rsb's on the toss list who's dir_nodeid is not local can have stale
746  * name/master mappings.  So, remote requests on such rsb's can potentially
747  * return with an error, which means the mapping is stale and needs to
748  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
749  * first_lkid is to keep only a single outstanding request on an rsb
750  * while that rsb has a potentially stale master.)
751  */
752 
753 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
754 			uint32_t hash, int dir_nodeid, int from_nodeid,
755 			unsigned int flags, struct dlm_rsb **r_ret)
756 {
757 	struct dlm_rsb *r = NULL;
758 	int our_nodeid = dlm_our_nodeid();
759 	int from_local = 0;
760 	int from_other = 0;
761 	int from_dir = 0;
762 	int create = 0;
763 	int error;
764 
765 	if (flags & R_RECEIVE_REQUEST) {
766 		if (from_nodeid == dir_nodeid)
767 			from_dir = 1;
768 		else
769 			from_other = 1;
770 	} else if (flags & R_REQUEST) {
771 		from_local = 1;
772 	}
773 
774 	/*
775 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
776 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
777 	 * we're the new master.  Our local recovery may not have set
778 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
779 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
780 	 * by resending.
781 	 *
782 	 * If someone sends us a request, we are the dir node, and we do
783 	 * not find the rsb anywhere, then recreate it.  This happens if
784 	 * someone sends us a request after we have removed/freed an rsb
785 	 * from our toss list.  (They sent a request instead of lookup
786 	 * because they are using an rsb from their toss list.)
787 	 */
788 
789 	if (from_local || from_dir ||
790 	    (from_other && (dir_nodeid == our_nodeid))) {
791 		create = 1;
792 	}
793 
794  retry:
795 	if (create) {
796 		error = pre_rsb_struct(ls);
797 		if (error < 0)
798 			goto out;
799 	}
800 
801  retry_lookup:
802 
803 	/* check if the rsb is in keep state under read lock - likely path */
804 	read_lock_bh(&ls->ls_rsbtbl_lock);
805 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
806 	if (error) {
807 		read_unlock_bh(&ls->ls_rsbtbl_lock);
808 		goto do_new;
809 	}
810 
811 	/*
812 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
813 	 */
814 
815 	if (rsb_flag(r, RSB_TOSS)) {
816 		read_unlock_bh(&ls->ls_rsbtbl_lock);
817 		goto do_toss;
818 	}
819 
820 	kref_get(&r->res_ref);
821 	read_unlock_bh(&ls->ls_rsbtbl_lock);
822 	goto out;
823 
824 
825  do_toss:
826 	write_lock_bh(&ls->ls_rsbtbl_lock);
827 
828 	/* retry lookup under write lock to see if its still in toss state
829 	 * if not it's in keep state and we relookup - unlikely path.
830 	 */
831 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
832 	if (!error) {
833 		if (!rsb_flag(r, RSB_TOSS)) {
834 			write_unlock_bh(&ls->ls_rsbtbl_lock);
835 			goto retry_lookup;
836 		}
837 	} else {
838 		write_unlock_bh(&ls->ls_rsbtbl_lock);
839 		goto do_new;
840 	}
841 
842 	/*
843 	 * rsb found inactive (master_nodeid may be out of date unless
844 	 * we are the dir_nodeid or were the master)  No other thread
845 	 * is using this rsb because it's on the toss list, so we can
846 	 * look at or update res_master_nodeid without lock_rsb.
847 	 */
848 
849 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
850 		/* our rsb was not master, and another node (not the dir node)
851 		   has sent us a request */
852 		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
853 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
854 			  r->res_name);
855 		write_unlock_bh(&ls->ls_rsbtbl_lock);
856 		error = -ENOTBLK;
857 		goto out;
858 	}
859 
860 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
861 		/* don't think this should ever happen */
862 		log_error(ls, "find_rsb toss from_dir %d master %d",
863 			  from_nodeid, r->res_master_nodeid);
864 		dlm_print_rsb(r);
865 		/* fix it and go on */
866 		r->res_master_nodeid = our_nodeid;
867 		r->res_nodeid = 0;
868 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
869 		r->res_first_lkid = 0;
870 	}
871 
872 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
873 		/* Because we have held no locks on this rsb,
874 		   res_master_nodeid could have become stale. */
875 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
876 		r->res_first_lkid = 0;
877 	}
878 
879 	list_move(&r->res_rsbs_list, &ls->ls_keep);
880 	rsb_clear_flag(r, RSB_TOSS);
881 	/* rsb got out of toss state, it becomes alive again
882 	 * and we reinit the reference counter that is only
883 	 * valid for keep state rsbs
884 	 */
885 	kref_init(&r->res_ref);
886 	rsb_delete_toss_timer(ls, r);
887 	write_unlock_bh(&ls->ls_rsbtbl_lock);
888 
889 	goto out;
890 
891 
892  do_new:
893 	/*
894 	 * rsb not found
895 	 */
896 
897 	if (error == -EBADR && !create)
898 		goto out;
899 
900 	error = get_rsb_struct(ls, name, len, &r);
901 	if (error == -EAGAIN)
902 		goto retry;
903 	if (error)
904 		goto out;
905 
906 	r->res_hash = hash;
907 	r->res_dir_nodeid = dir_nodeid;
908 	kref_init(&r->res_ref);
909 
910 	if (from_dir) {
911 		/* want to see how often this happens */
912 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
913 			  from_nodeid, r->res_name);
914 		r->res_master_nodeid = our_nodeid;
915 		r->res_nodeid = 0;
916 		goto out_add;
917 	}
918 
919 	if (from_other && (dir_nodeid != our_nodeid)) {
920 		/* should never happen */
921 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
922 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
923 		dlm_free_rsb(r);
924 		r = NULL;
925 		error = -ENOTBLK;
926 		goto out;
927 	}
928 
929 	if (from_other) {
930 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
931 			  from_nodeid, dir_nodeid, r->res_name);
932 	}
933 
934 	if (dir_nodeid == our_nodeid) {
935 		/* When we are the dir nodeid, we can set the master
936 		   node immediately */
937 		r->res_master_nodeid = our_nodeid;
938 		r->res_nodeid = 0;
939 	} else {
940 		/* set_master will send_lookup to dir_nodeid */
941 		r->res_master_nodeid = 0;
942 		r->res_nodeid = -1;
943 	}
944 
945  out_add:
946 
947 	write_lock_bh(&ls->ls_rsbtbl_lock);
948 	error = rsb_insert(r, &ls->ls_rsbtbl);
949 	if (error == -EEXIST) {
950 		/* somebody else was faster and it seems the
951 		 * rsb exists now, we do a whole relookup
952 		 */
953 		write_unlock_bh(&ls->ls_rsbtbl_lock);
954 		dlm_free_rsb(r);
955 		goto retry_lookup;
956 	} else if (!error) {
957 		list_add(&r->res_rsbs_list, &ls->ls_keep);
958 	}
959 	write_unlock_bh(&ls->ls_rsbtbl_lock);
960  out:
961 	*r_ret = r;
962 	return error;
963 }
964 
965 /* During recovery, other nodes can send us new MSTCPY locks (from
966    dlm_recover_locks) before we've made ourself master (in
967    dlm_recover_masters). */
968 
969 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
970 			  uint32_t hash, int dir_nodeid, int from_nodeid,
971 			  unsigned int flags, struct dlm_rsb **r_ret)
972 {
973 	struct dlm_rsb *r = NULL;
974 	int our_nodeid = dlm_our_nodeid();
975 	int recover = (flags & R_RECEIVE_RECOVER);
976 	int error;
977 
978  retry:
979 	error = pre_rsb_struct(ls);
980 	if (error < 0)
981 		goto out;
982 
983  retry_lookup:
984 
985 	/* check if the rsb is in keep state under read lock - likely path */
986 	read_lock_bh(&ls->ls_rsbtbl_lock);
987 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
988 	if (error) {
989 		read_unlock_bh(&ls->ls_rsbtbl_lock);
990 		goto do_new;
991 	}
992 
993 	if (rsb_flag(r, RSB_TOSS)) {
994 		read_unlock_bh(&ls->ls_rsbtbl_lock);
995 		goto do_toss;
996 	}
997 
998 	/*
999 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
1000 	 */
1001 
1002 	kref_get(&r->res_ref);
1003 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1004 
1005 	goto out;
1006 
1007 
1008  do_toss:
1009 	write_lock_bh(&ls->ls_rsbtbl_lock);
1010 
1011 	/* retry lookup under write lock to see if its still in toss state
1012 	 * if not it's in keep state and we relookup - unlikely path.
1013 	 */
1014 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1015 	if (!error) {
1016 		if (!rsb_flag(r, RSB_TOSS)) {
1017 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1018 			goto retry_lookup;
1019 		}
1020 	} else {
1021 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1022 		goto do_new;
1023 	}
1024 
1025 
1026 	/*
1027 	 * rsb found inactive. No other thread is using this rsb because
1028 	 * it's on the toss list, so we can look at or update
1029 	 * res_master_nodeid without lock_rsb.
1030 	 */
1031 
1032 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
1033 		/* our rsb is not master, and another node has sent us a
1034 		   request; this should never happen */
1035 		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
1036 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
1037 		dlm_print_rsb(r);
1038 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1039 		error = -ENOTBLK;
1040 		goto out;
1041 	}
1042 
1043 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
1044 	    (dir_nodeid == our_nodeid)) {
1045 		/* our rsb is not master, and we are dir; may as well fix it;
1046 		   this should never happen */
1047 		log_error(ls, "find_rsb toss our %d master %d dir %d",
1048 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
1049 		dlm_print_rsb(r);
1050 		r->res_master_nodeid = our_nodeid;
1051 		r->res_nodeid = 0;
1052 	}
1053 
1054 	list_move(&r->res_rsbs_list, &ls->ls_keep);
1055 	rsb_clear_flag(r, RSB_TOSS);
1056 	/* rsb got out of toss state, it becomes alive again
1057 	 * and we reinit the reference counter that is only
1058 	 * valid for keep state rsbs
1059 	 */
1060 	kref_init(&r->res_ref);
1061 	rsb_delete_toss_timer(ls, r);
1062 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1063 
1064 	goto out;
1065 
1066 
1067  do_new:
1068 	/*
1069 	 * rsb not found
1070 	 */
1071 
1072 	error = get_rsb_struct(ls, name, len, &r);
1073 	if (error == -EAGAIN) {
1074 		goto retry;
1075 	}
1076 	if (error)
1077 		goto out;
1078 
1079 	r->res_hash = hash;
1080 	r->res_dir_nodeid = dir_nodeid;
1081 	r->res_master_nodeid = dir_nodeid;
1082 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1083 	kref_init(&r->res_ref);
1084 
1085 	write_lock_bh(&ls->ls_rsbtbl_lock);
1086 	error = rsb_insert(r, &ls->ls_rsbtbl);
1087 	if (error == -EEXIST) {
1088 		/* somebody else was faster and it seems the
1089 		 * rsb exists now, we do a whole relookup
1090 		 */
1091 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1092 		dlm_free_rsb(r);
1093 		goto retry_lookup;
1094 	} else if (!error) {
1095 		list_add(&r->res_rsbs_list, &ls->ls_keep);
1096 	}
1097 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1098 
1099  out:
1100 	*r_ret = r;
1101 	return error;
1102 }
1103 
1104 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1105 		    int from_nodeid, unsigned int flags,
1106 		    struct dlm_rsb **r_ret)
1107 {
1108 	int dir_nodeid;
1109 	uint32_t hash;
1110 
1111 	if (len > DLM_RESNAME_MAXLEN)
1112 		return -EINVAL;
1113 
1114 	hash = jhash(name, len, 0);
1115 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1116 
1117 	if (dlm_no_directory(ls))
1118 		return find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1119 				      from_nodeid, flags, r_ret);
1120 	else
1121 		return find_rsb_dir(ls, name, len, hash, dir_nodeid,
1122 				    from_nodeid, flags, r_ret);
1123 }
1124 
1125 /* we have received a request and found that res_master_nodeid != our_nodeid,
1126    so we need to return an error or make ourself the master */
1127 
1128 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1129 				  int from_nodeid)
1130 {
1131 	if (dlm_no_directory(ls)) {
1132 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1133 			  from_nodeid, r->res_master_nodeid,
1134 			  r->res_dir_nodeid);
1135 		dlm_print_rsb(r);
1136 		return -ENOTBLK;
1137 	}
1138 
1139 	if (from_nodeid != r->res_dir_nodeid) {
1140 		/* our rsb is not master, and another node (not the dir node)
1141 	   	   has sent us a request.  this is much more common when our
1142 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1143 
1144 		if (r->res_master_nodeid) {
1145 			log_debug(ls, "validate master from_other %d master %d "
1146 				  "dir %d first %x %s", from_nodeid,
1147 				  r->res_master_nodeid, r->res_dir_nodeid,
1148 				  r->res_first_lkid, r->res_name);
1149 		}
1150 		return -ENOTBLK;
1151 	} else {
1152 		/* our rsb is not master, but the dir nodeid has sent us a
1153 	   	   request; this could happen with master 0 / res_nodeid -1 */
1154 
1155 		if (r->res_master_nodeid) {
1156 			log_error(ls, "validate master from_dir %d master %d "
1157 				  "first %x %s",
1158 				  from_nodeid, r->res_master_nodeid,
1159 				  r->res_first_lkid, r->res_name);
1160 		}
1161 
1162 		r->res_master_nodeid = dlm_our_nodeid();
1163 		r->res_nodeid = 0;
1164 		return 0;
1165 	}
1166 }
1167 
1168 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1169 				int from_nodeid, bool toss_list, unsigned int flags,
1170 				int *r_nodeid, int *result)
1171 {
1172 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1173 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1174 
1175 	if (r->res_dir_nodeid != our_nodeid) {
1176 		/* should not happen, but may as well fix it and carry on */
1177 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1178 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1179 		r->res_dir_nodeid = our_nodeid;
1180 	}
1181 
1182 	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
1183 		/* Recovery uses this function to set a new master when
1184 		 * the previous master failed.  Setting NEW_MASTER will
1185 		 * force dlm_recover_masters to call recover_master on this
1186 		 * rsb even though the res_nodeid is no longer removed.
1187 		 */
1188 
1189 		r->res_master_nodeid = from_nodeid;
1190 		r->res_nodeid = from_nodeid;
1191 		rsb_set_flag(r, RSB_NEW_MASTER);
1192 
1193 		if (toss_list) {
1194 			/* I don't think we should ever find it on toss list. */
1195 			log_error(ls, "%s fix_master on toss", __func__);
1196 			dlm_dump_rsb(r);
1197 		}
1198 	}
1199 
1200 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1201 		/* this will happen if from_nodeid became master during
1202 		 * a previous recovery cycle, and we aborted the previous
1203 		 * cycle before recovering this master value
1204 		 */
1205 
1206 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1207 			  __func__, from_nodeid, r->res_master_nodeid,
1208 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1209 
1210 		if (r->res_master_nodeid == our_nodeid) {
1211 			log_error(ls, "from_master %d our_master", from_nodeid);
1212 			dlm_dump_rsb(r);
1213 			goto ret_assign;
1214 		}
1215 
1216 		r->res_master_nodeid = from_nodeid;
1217 		r->res_nodeid = from_nodeid;
1218 		rsb_set_flag(r, RSB_NEW_MASTER);
1219 	}
1220 
1221 	if (!r->res_master_nodeid) {
1222 		/* this will happen if recovery happens while we're looking
1223 		 * up the master for this rsb
1224 		 */
1225 
1226 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1227 			  from_nodeid, r->res_first_lkid, r->res_name);
1228 		r->res_master_nodeid = from_nodeid;
1229 		r->res_nodeid = from_nodeid;
1230 	}
1231 
1232 	if (!from_master && !fix_master &&
1233 	    (r->res_master_nodeid == from_nodeid)) {
1234 		/* this can happen when the master sends remove, the dir node
1235 		 * finds the rsb on the keep list and ignores the remove,
1236 		 * and the former master sends a lookup
1237 		 */
1238 
1239 		log_limit(ls, "%s from master %d flags %x first %x %s",
1240 			  __func__, from_nodeid, flags, r->res_first_lkid,
1241 			  r->res_name);
1242 	}
1243 
1244  ret_assign:
1245 	*r_nodeid = r->res_master_nodeid;
1246 	if (result)
1247 		*result = DLM_LU_MATCH;
1248 }
1249 
1250 /*
1251  * We're the dir node for this res and another node wants to know the
1252  * master nodeid.  During normal operation (non recovery) this is only
1253  * called from receive_lookup(); master lookups when the local node is
1254  * the dir node are done by find_rsb().
1255  *
1256  * normal operation, we are the dir node for a resource
1257  * . _request_lock
1258  * . set_master
1259  * . send_lookup
1260  * . receive_lookup
1261  * . dlm_master_lookup flags 0
1262  *
1263  * recover directory, we are rebuilding dir for all resources
1264  * . dlm_recover_directory
1265  * . dlm_rcom_names
1266  *   remote node sends back the rsb names it is master of and we are dir of
1267  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1268  *   we either create new rsb setting remote node as master, or find existing
1269  *   rsb and set master to be the remote node.
1270  *
1271  * recover masters, we are finding the new master for resources
1272  * . dlm_recover_masters
1273  * . recover_master
1274  * . dlm_send_rcom_lookup
1275  * . receive_rcom_lookup
1276  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1277  */
1278 
1279 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1280 		      int len, unsigned int flags, int *r_nodeid, int *result)
1281 {
1282 	struct dlm_rsb *r = NULL;
1283 	uint32_t hash;
1284 	int our_nodeid = dlm_our_nodeid();
1285 	int dir_nodeid, error;
1286 
1287 	if (len > DLM_RESNAME_MAXLEN)
1288 		return -EINVAL;
1289 
1290 	if (from_nodeid == our_nodeid) {
1291 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1292 			  our_nodeid, flags);
1293 		return -EINVAL;
1294 	}
1295 
1296 	hash = jhash(name, len, 0);
1297 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1298 	if (dir_nodeid != our_nodeid) {
1299 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1300 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1301 			  ls->ls_num_nodes);
1302 		*r_nodeid = -1;
1303 		return -EINVAL;
1304 	}
1305 
1306  retry:
1307 	error = pre_rsb_struct(ls);
1308 	if (error < 0)
1309 		return error;
1310 
1311  retry_lookup:
1312 
1313 	/* check if the rsb is in keep state under read lock - likely path */
1314 	read_lock_bh(&ls->ls_rsbtbl_lock);
1315 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1316 	if (!error) {
1317 		if (rsb_flag(r, RSB_TOSS)) {
1318 			read_unlock_bh(&ls->ls_rsbtbl_lock);
1319 			goto do_toss;
1320 		}
1321 
1322 		/* because the rsb is active, we need to lock_rsb before
1323 		 * checking/changing re_master_nodeid
1324 		 */
1325 
1326 		hold_rsb(r);
1327 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1328 		lock_rsb(r);
1329 
1330 		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1331 				    flags, r_nodeid, result);
1332 
1333 		/* the rsb was active */
1334 		unlock_rsb(r);
1335 		put_rsb(r);
1336 
1337 		return 0;
1338 	} else {
1339 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1340 		goto not_found;
1341 	}
1342 
1343  do_toss:
1344 	/* unlikely path - relookup under write */
1345 	write_lock_bh(&ls->ls_rsbtbl_lock);
1346 
1347 	/* rsb_mod_timer() requires to held ls_rsbtbl_lock in write lock
1348 	 * check if the rsb is still in toss state, if not relookup
1349 	 */
1350 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1351 	if (!error) {
1352 		if (!rsb_flag(r, RSB_TOSS)) {
1353 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1354 			/* something as changed, very unlikely but
1355 			 * try again
1356 			 */
1357 			goto retry_lookup;
1358 		}
1359 	} else {
1360 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1361 		goto not_found;
1362 	}
1363 
1364 	/* because the rsb is inactive (on toss list), it's not refcounted
1365 	 * and lock_rsb is not used, but is protected by the rsbtbl lock
1366 	 */
1367 
1368 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1369 			    r_nodeid, result);
1370 
1371 	rsb_mod_timer(ls, r);
1372 	/* the rsb was inactive (on toss list) */
1373 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1374 
1375 	return 0;
1376 
1377  not_found:
1378 	error = get_rsb_struct(ls, name, len, &r);
1379 	if (error == -EAGAIN)
1380 		goto retry;
1381 	if (error)
1382 		goto out;
1383 
1384 	r->res_hash = hash;
1385 	r->res_dir_nodeid = our_nodeid;
1386 	r->res_master_nodeid = from_nodeid;
1387 	r->res_nodeid = from_nodeid;
1388 	kref_init(&r->res_ref);
1389 	rsb_set_flag(r, RSB_TOSS);
1390 
1391 	write_lock_bh(&ls->ls_rsbtbl_lock);
1392 	error = rsb_insert(r, &ls->ls_rsbtbl);
1393 	if (error == -EEXIST) {
1394 		/* somebody else was faster and it seems the
1395 		 * rsb exists now, we do a whole relookup
1396 		 */
1397 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1398 		dlm_free_rsb(r);
1399 		goto retry_lookup;
1400 	} else if (error) {
1401 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1402 		/* should never happen */
1403 		dlm_free_rsb(r);
1404 		goto retry;
1405 	}
1406 
1407 	list_add(&r->res_rsbs_list, &ls->ls_toss);
1408 	rsb_mod_timer(ls, r);
1409 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1410 
1411 	if (result)
1412 		*result = DLM_LU_ADD;
1413 	*r_nodeid = from_nodeid;
1414  out:
1415 	return error;
1416 }
1417 
1418 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1419 {
1420 	struct dlm_rsb *r;
1421 
1422 	read_lock_bh(&ls->ls_rsbtbl_lock);
1423 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
1424 		if (r->res_hash == hash)
1425 			dlm_dump_rsb(r);
1426 	}
1427 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1428 }
1429 
1430 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1431 {
1432 	struct dlm_rsb *r = NULL;
1433 	int error;
1434 
1435 	read_lock_bh(&ls->ls_rsbtbl_lock);
1436 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1437 	if (!error)
1438 		goto out;
1439 
1440 	dlm_dump_rsb(r);
1441  out:
1442 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1443 }
1444 
1445 static void toss_rsb(struct kref *kref)
1446 {
1447 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1448 	struct dlm_ls *ls = r->res_ls;
1449 
1450 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1451 	rsb_set_flag(r, RSB_TOSS);
1452 	list_move(&r->res_rsbs_list, &ls->ls_toss);
1453 	rsb_mod_timer(ls, r);
1454 
1455 	if (r->res_lvbptr) {
1456 		dlm_free_lvb(r->res_lvbptr);
1457 		r->res_lvbptr = NULL;
1458 	}
1459 }
1460 
1461 /* See comment for unhold_lkb */
1462 
1463 static void unhold_rsb(struct dlm_rsb *r)
1464 {
1465 	int rv;
1466 
1467 	/* rsbs in toss state never get referenced */
1468 	WARN_ON(rsb_flag(r, RSB_TOSS));
1469 	rv = kref_put(&r->res_ref, toss_rsb);
1470 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1471 }
1472 
1473 void free_toss_rsb(struct dlm_rsb *r)
1474 {
1475 	WARN_ON_ONCE(!rsb_flag(r, RSB_TOSS));
1476 
1477 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1478 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1479 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1480 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1481 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1482 	DLM_ASSERT(list_empty(&r->res_toss_q_list), dlm_dump_rsb(r););
1483 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1484 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1485 
1486 	dlm_free_rsb(r);
1487 }
1488 
1489 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1490    The rsb must exist as long as any lkb's for it do. */
1491 
1492 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1493 {
1494 	hold_rsb(r);
1495 	lkb->lkb_resource = r;
1496 }
1497 
1498 static void detach_lkb(struct dlm_lkb *lkb)
1499 {
1500 	if (lkb->lkb_resource) {
1501 		put_rsb(lkb->lkb_resource);
1502 		lkb->lkb_resource = NULL;
1503 	}
1504 }
1505 
1506 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1507 		       int start, int end)
1508 {
1509 	struct dlm_lkb *lkb;
1510 	int rv;
1511 
1512 	lkb = dlm_allocate_lkb(ls);
1513 	if (!lkb)
1514 		return -ENOMEM;
1515 
1516 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1517 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1518 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1519 	lkb->lkb_nodeid = -1;
1520 	lkb->lkb_grmode = DLM_LOCK_IV;
1521 	kref_init(&lkb->lkb_ref);
1522 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1523 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1524 
1525 	write_lock_bh(&ls->ls_lkbidr_lock);
1526 	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1527 	if (rv >= 0)
1528 		lkb->lkb_id = rv;
1529 	write_unlock_bh(&ls->ls_lkbidr_lock);
1530 
1531 	if (rv < 0) {
1532 		log_error(ls, "create_lkb idr error %d", rv);
1533 		dlm_free_lkb(lkb);
1534 		return rv;
1535 	}
1536 
1537 	*lkb_ret = lkb;
1538 	return 0;
1539 }
1540 
1541 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1542 {
1543 	return _create_lkb(ls, lkb_ret, 1, 0);
1544 }
1545 
1546 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1547 {
1548 	struct dlm_lkb *lkb;
1549 
1550 	read_lock_bh(&ls->ls_lkbidr_lock);
1551 	lkb = idr_find(&ls->ls_lkbidr, lkid);
1552 	if (lkb)
1553 		kref_get(&lkb->lkb_ref);
1554 	read_unlock_bh(&ls->ls_lkbidr_lock);
1555 
1556 	*lkb_ret = lkb;
1557 	return lkb ? 0 : -ENOENT;
1558 }
1559 
1560 static void kill_lkb(struct kref *kref)
1561 {
1562 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1563 
1564 	/* All work is done after the return from kref_put() so we
1565 	   can release the write_lock before the detach_lkb */
1566 
1567 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1568 }
1569 
1570 /* __put_lkb() is used when an lkb may not have an rsb attached to
1571    it so we need to provide the lockspace explicitly */
1572 
1573 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1574 {
1575 	uint32_t lkid = lkb->lkb_id;
1576 	int rv;
1577 
1578 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1579 					&ls->ls_lkbidr_lock);
1580 	if (rv) {
1581 		idr_remove(&ls->ls_lkbidr, lkid);
1582 		write_unlock_bh(&ls->ls_lkbidr_lock);
1583 
1584 		detach_lkb(lkb);
1585 
1586 		/* for local/process lkbs, lvbptr points to caller's lksb */
1587 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1588 			dlm_free_lvb(lkb->lkb_lvbptr);
1589 		dlm_free_lkb(lkb);
1590 	}
1591 
1592 	return rv;
1593 }
1594 
1595 int dlm_put_lkb(struct dlm_lkb *lkb)
1596 {
1597 	struct dlm_ls *ls;
1598 
1599 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1600 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1601 
1602 	ls = lkb->lkb_resource->res_ls;
1603 	return __put_lkb(ls, lkb);
1604 }
1605 
1606 /* This is only called to add a reference when the code already holds
1607    a valid reference to the lkb, so there's no need for locking. */
1608 
1609 static inline void hold_lkb(struct dlm_lkb *lkb)
1610 {
1611 	kref_get(&lkb->lkb_ref);
1612 }
1613 
1614 static void unhold_lkb_assert(struct kref *kref)
1615 {
1616 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1617 
1618 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1619 }
1620 
1621 /* This is called when we need to remove a reference and are certain
1622    it's not the last ref.  e.g. del_lkb is always called between a
1623    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1624    put_lkb would work fine, but would involve unnecessary locking */
1625 
1626 static inline void unhold_lkb(struct dlm_lkb *lkb)
1627 {
1628 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1629 }
1630 
1631 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1632 			    int mode)
1633 {
1634 	struct dlm_lkb *lkb = NULL, *iter;
1635 
1636 	list_for_each_entry(iter, head, lkb_statequeue)
1637 		if (iter->lkb_rqmode < mode) {
1638 			lkb = iter;
1639 			list_add_tail(new, &iter->lkb_statequeue);
1640 			break;
1641 		}
1642 
1643 	if (!lkb)
1644 		list_add_tail(new, head);
1645 }
1646 
1647 /* add/remove lkb to rsb's grant/convert/wait queue */
1648 
1649 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1650 {
1651 	kref_get(&lkb->lkb_ref);
1652 
1653 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1654 
1655 	lkb->lkb_timestamp = ktime_get();
1656 
1657 	lkb->lkb_status = status;
1658 
1659 	switch (status) {
1660 	case DLM_LKSTS_WAITING:
1661 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1662 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1663 		else
1664 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1665 		break;
1666 	case DLM_LKSTS_GRANTED:
1667 		/* convention says granted locks kept in order of grmode */
1668 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1669 				lkb->lkb_grmode);
1670 		break;
1671 	case DLM_LKSTS_CONVERT:
1672 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1673 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1674 		else
1675 			list_add_tail(&lkb->lkb_statequeue,
1676 				      &r->res_convertqueue);
1677 		break;
1678 	default:
1679 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1680 	}
1681 }
1682 
1683 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1684 {
1685 	lkb->lkb_status = 0;
1686 	list_del(&lkb->lkb_statequeue);
1687 	unhold_lkb(lkb);
1688 }
1689 
1690 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1691 {
1692 	hold_lkb(lkb);
1693 	del_lkb(r, lkb);
1694 	add_lkb(r, lkb, sts);
1695 	unhold_lkb(lkb);
1696 }
1697 
1698 static int msg_reply_type(int mstype)
1699 {
1700 	switch (mstype) {
1701 	case DLM_MSG_REQUEST:
1702 		return DLM_MSG_REQUEST_REPLY;
1703 	case DLM_MSG_CONVERT:
1704 		return DLM_MSG_CONVERT_REPLY;
1705 	case DLM_MSG_UNLOCK:
1706 		return DLM_MSG_UNLOCK_REPLY;
1707 	case DLM_MSG_CANCEL:
1708 		return DLM_MSG_CANCEL_REPLY;
1709 	case DLM_MSG_LOOKUP:
1710 		return DLM_MSG_LOOKUP_REPLY;
1711 	}
1712 	return -1;
1713 }
1714 
1715 /* add/remove lkb from global waiters list of lkb's waiting for
1716    a reply from a remote node */
1717 
1718 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1719 {
1720 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1721 	int error = 0;
1722 
1723 	spin_lock_bh(&ls->ls_waiters_lock);
1724 
1725 	if (is_overlap_unlock(lkb) ||
1726 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1727 		error = -EINVAL;
1728 		goto out;
1729 	}
1730 
1731 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1732 		switch (mstype) {
1733 		case DLM_MSG_UNLOCK:
1734 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1735 			break;
1736 		case DLM_MSG_CANCEL:
1737 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1738 			break;
1739 		default:
1740 			error = -EBUSY;
1741 			goto out;
1742 		}
1743 		lkb->lkb_wait_count++;
1744 		hold_lkb(lkb);
1745 
1746 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749 		goto out;
1750 	}
1751 
1752 	DLM_ASSERT(!lkb->lkb_wait_count,
1753 		   dlm_print_lkb(lkb);
1754 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1755 
1756 	lkb->lkb_wait_count++;
1757 	lkb->lkb_wait_type = mstype;
1758 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759 	hold_lkb(lkb);
1760 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761  out:
1762 	if (error)
1763 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1764 			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1765 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1766 	spin_unlock_bh(&ls->ls_waiters_lock);
1767 	return error;
1768 }
1769 
1770 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1771    list as part of process_requestqueue (e.g. a lookup that has an optimized
1772    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1773    set RESEND and dlm_recover_waiters_post() */
1774 
1775 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1776 				const struct dlm_message *ms)
1777 {
1778 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1779 	int overlap_done = 0;
1780 
1781 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1782 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1783 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1784 		overlap_done = 1;
1785 		goto out_del;
1786 	}
1787 
1788 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1789 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1790 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1791 		overlap_done = 1;
1792 		goto out_del;
1793 	}
1794 
1795 	/* Cancel state was preemptively cleared by a successful convert,
1796 	   see next comment, nothing to do. */
1797 
1798 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1799 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1800 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1801 			  lkb->lkb_id, lkb->lkb_wait_type);
1802 		return -1;
1803 	}
1804 
1805 	/* Remove for the convert reply, and premptively remove for the
1806 	   cancel reply.  A convert has been granted while there's still
1807 	   an outstanding cancel on it (the cancel is moot and the result
1808 	   in the cancel reply should be 0).  We preempt the cancel reply
1809 	   because the app gets the convert result and then can follow up
1810 	   with another op, like convert.  This subsequent op would see the
1811 	   lingering state of the cancel and fail with -EBUSY. */
1812 
1813 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1814 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1815 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1816 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1817 			  lkb->lkb_id);
1818 		lkb->lkb_wait_type = 0;
1819 		lkb->lkb_wait_count--;
1820 		unhold_lkb(lkb);
1821 		goto out_del;
1822 	}
1823 
1824 	/* N.B. type of reply may not always correspond to type of original
1825 	   msg due to lookup->request optimization, verify others? */
1826 
1827 	if (lkb->lkb_wait_type) {
1828 		lkb->lkb_wait_type = 0;
1829 		goto out_del;
1830 	}
1831 
1832 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1833 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1834 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1835 	return -1;
1836 
1837  out_del:
1838 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1839 	   to the op that was in progress prior to the unlock/cancel; we
1840 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1841 	   this would happen */
1842 
1843 	if (overlap_done && lkb->lkb_wait_type) {
1844 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1845 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1846 		lkb->lkb_wait_count--;
1847 		unhold_lkb(lkb);
1848 		lkb->lkb_wait_type = 0;
1849 	}
1850 
1851 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1852 
1853 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1854 	lkb->lkb_wait_count--;
1855 	if (!lkb->lkb_wait_count)
1856 		list_del_init(&lkb->lkb_wait_reply);
1857 	unhold_lkb(lkb);
1858 	return 0;
1859 }
1860 
1861 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1862 {
1863 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1864 	int error;
1865 
1866 	spin_lock_bh(&ls->ls_waiters_lock);
1867 	error = _remove_from_waiters(lkb, mstype, NULL);
1868 	spin_unlock_bh(&ls->ls_waiters_lock);
1869 	return error;
1870 }
1871 
1872 /* Handles situations where we might be processing a "fake" or "local" reply in
1873  * the recovery context which stops any locking activity. Only debugfs might
1874  * change the lockspace waiters but they will held the recovery lock to ensure
1875  * remove_from_waiters_ms() in local case will be the only user manipulating the
1876  * lockspace waiters in recovery context.
1877  */
1878 
1879 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1880 				  const struct dlm_message *ms, bool local)
1881 {
1882 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1883 	int error;
1884 
1885 	if (!local)
1886 		spin_lock_bh(&ls->ls_waiters_lock);
1887 	else
1888 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1889 			     !dlm_locking_stopped(ls));
1890 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1891 	if (!local)
1892 		spin_unlock_bh(&ls->ls_waiters_lock);
1893 	return error;
1894 }
1895 
1896 /* lkb is master or local copy */
1897 
1898 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1899 {
1900 	int b, len = r->res_ls->ls_lvblen;
1901 
1902 	/* b=1 lvb returned to caller
1903 	   b=0 lvb written to rsb or invalidated
1904 	   b=-1 do nothing */
1905 
1906 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1907 
1908 	if (b == 1) {
1909 		if (!lkb->lkb_lvbptr)
1910 			return;
1911 
1912 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1913 			return;
1914 
1915 		if (!r->res_lvbptr)
1916 			return;
1917 
1918 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1919 		lkb->lkb_lvbseq = r->res_lvbseq;
1920 
1921 	} else if (b == 0) {
1922 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1923 			rsb_set_flag(r, RSB_VALNOTVALID);
1924 			return;
1925 		}
1926 
1927 		if (!lkb->lkb_lvbptr)
1928 			return;
1929 
1930 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1931 			return;
1932 
1933 		if (!r->res_lvbptr)
1934 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1935 
1936 		if (!r->res_lvbptr)
1937 			return;
1938 
1939 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1940 		r->res_lvbseq++;
1941 		lkb->lkb_lvbseq = r->res_lvbseq;
1942 		rsb_clear_flag(r, RSB_VALNOTVALID);
1943 	}
1944 
1945 	if (rsb_flag(r, RSB_VALNOTVALID))
1946 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1947 }
1948 
1949 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1950 {
1951 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1952 		return;
1953 
1954 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1955 		rsb_set_flag(r, RSB_VALNOTVALID);
1956 		return;
1957 	}
1958 
1959 	if (!lkb->lkb_lvbptr)
1960 		return;
1961 
1962 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1963 		return;
1964 
1965 	if (!r->res_lvbptr)
1966 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1967 
1968 	if (!r->res_lvbptr)
1969 		return;
1970 
1971 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1972 	r->res_lvbseq++;
1973 	rsb_clear_flag(r, RSB_VALNOTVALID);
1974 }
1975 
1976 /* lkb is process copy (pc) */
1977 
1978 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1979 			    const struct dlm_message *ms)
1980 {
1981 	int b;
1982 
1983 	if (!lkb->lkb_lvbptr)
1984 		return;
1985 
1986 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1987 		return;
1988 
1989 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1990 	if (b == 1) {
1991 		int len = receive_extralen(ms);
1992 		if (len > r->res_ls->ls_lvblen)
1993 			len = r->res_ls->ls_lvblen;
1994 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1995 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1996 	}
1997 }
1998 
1999 /* Manipulate lkb's on rsb's convert/granted/waiting queues
2000    remove_lock -- used for unlock, removes lkb from granted
2001    revert_lock -- used for cancel, moves lkb from convert to granted
2002    grant_lock  -- used for request and convert, adds lkb to granted or
2003                   moves lkb from convert or waiting to granted
2004 
2005    Each of these is used for master or local copy lkb's.  There is
2006    also a _pc() variation used to make the corresponding change on
2007    a process copy (pc) lkb. */
2008 
2009 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2010 {
2011 	del_lkb(r, lkb);
2012 	lkb->lkb_grmode = DLM_LOCK_IV;
2013 	/* this unhold undoes the original ref from create_lkb()
2014 	   so this leads to the lkb being freed */
2015 	unhold_lkb(lkb);
2016 }
2017 
2018 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2019 {
2020 	set_lvb_unlock(r, lkb);
2021 	_remove_lock(r, lkb);
2022 }
2023 
2024 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2025 {
2026 	_remove_lock(r, lkb);
2027 }
2028 
2029 /* returns: 0 did nothing
2030 	    1 moved lock to granted
2031 	   -1 removed lock */
2032 
2033 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2034 {
2035 	int rv = 0;
2036 
2037 	lkb->lkb_rqmode = DLM_LOCK_IV;
2038 
2039 	switch (lkb->lkb_status) {
2040 	case DLM_LKSTS_GRANTED:
2041 		break;
2042 	case DLM_LKSTS_CONVERT:
2043 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2044 		rv = 1;
2045 		break;
2046 	case DLM_LKSTS_WAITING:
2047 		del_lkb(r, lkb);
2048 		lkb->lkb_grmode = DLM_LOCK_IV;
2049 		/* this unhold undoes the original ref from create_lkb()
2050 		   so this leads to the lkb being freed */
2051 		unhold_lkb(lkb);
2052 		rv = -1;
2053 		break;
2054 	default:
2055 		log_print("invalid status for revert %d", lkb->lkb_status);
2056 	}
2057 	return rv;
2058 }
2059 
2060 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061 {
2062 	return revert_lock(r, lkb);
2063 }
2064 
2065 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2066 {
2067 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2068 		lkb->lkb_grmode = lkb->lkb_rqmode;
2069 		if (lkb->lkb_status)
2070 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2071 		else
2072 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2073 	}
2074 
2075 	lkb->lkb_rqmode = DLM_LOCK_IV;
2076 	lkb->lkb_highbast = 0;
2077 }
2078 
2079 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2080 {
2081 	set_lvb_lock(r, lkb);
2082 	_grant_lock(r, lkb);
2083 }
2084 
2085 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2086 			  const struct dlm_message *ms)
2087 {
2088 	set_lvb_lock_pc(r, lkb, ms);
2089 	_grant_lock(r, lkb);
2090 }
2091 
2092 /* called by grant_pending_locks() which means an async grant message must
2093    be sent to the requesting node in addition to granting the lock if the
2094    lkb belongs to a remote node. */
2095 
2096 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2097 {
2098 	grant_lock(r, lkb);
2099 	if (is_master_copy(lkb))
2100 		send_grant(r, lkb);
2101 	else
2102 		queue_cast(r, lkb, 0);
2103 }
2104 
2105 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2106    change the granted/requested modes.  We're munging things accordingly in
2107    the process copy.
2108    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2109    conversion deadlock
2110    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2111    compatible with other granted locks */
2112 
2113 static void munge_demoted(struct dlm_lkb *lkb)
2114 {
2115 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2116 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2117 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2118 		return;
2119 	}
2120 
2121 	lkb->lkb_grmode = DLM_LOCK_NL;
2122 }
2123 
2124 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2125 {
2126 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2127 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2128 		log_print("munge_altmode %x invalid reply type %d",
2129 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2130 		return;
2131 	}
2132 
2133 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2134 		lkb->lkb_rqmode = DLM_LOCK_PR;
2135 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2136 		lkb->lkb_rqmode = DLM_LOCK_CW;
2137 	else {
2138 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2139 		dlm_print_lkb(lkb);
2140 	}
2141 }
2142 
2143 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2144 {
2145 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2146 					   lkb_statequeue);
2147 	if (lkb->lkb_id == first->lkb_id)
2148 		return 1;
2149 
2150 	return 0;
2151 }
2152 
2153 /* Check if the given lkb conflicts with another lkb on the queue. */
2154 
2155 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2156 {
2157 	struct dlm_lkb *this;
2158 
2159 	list_for_each_entry(this, head, lkb_statequeue) {
2160 		if (this == lkb)
2161 			continue;
2162 		if (!modes_compat(this, lkb))
2163 			return 1;
2164 	}
2165 	return 0;
2166 }
2167 
2168 /*
2169  * "A conversion deadlock arises with a pair of lock requests in the converting
2170  * queue for one resource.  The granted mode of each lock blocks the requested
2171  * mode of the other lock."
2172  *
2173  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2174  * convert queue from being granted, then deadlk/demote lkb.
2175  *
2176  * Example:
2177  * Granted Queue: empty
2178  * Convert Queue: NL->EX (first lock)
2179  *                PR->EX (second lock)
2180  *
2181  * The first lock can't be granted because of the granted mode of the second
2182  * lock and the second lock can't be granted because it's not first in the
2183  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2184  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2185  * flag set and return DEMOTED in the lksb flags.
2186  *
2187  * Originally, this function detected conv-deadlk in a more limited scope:
2188  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2189  * - if lkb1 was the first entry in the queue (not just earlier), and was
2190  *   blocked by the granted mode of lkb2, and there was nothing on the
2191  *   granted queue preventing lkb1 from being granted immediately, i.e.
2192  *   lkb2 was the only thing preventing lkb1 from being granted.
2193  *
2194  * That second condition meant we'd only say there was conv-deadlk if
2195  * resolving it (by demotion) would lead to the first lock on the convert
2196  * queue being granted right away.  It allowed conversion deadlocks to exist
2197  * between locks on the convert queue while they couldn't be granted anyway.
2198  *
2199  * Now, we detect and take action on conversion deadlocks immediately when
2200  * they're created, even if they may not be immediately consequential.  If
2201  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2202  * mode that would prevent lkb1's conversion from being granted, we do a
2203  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2204  * I think this means that the lkb_is_ahead condition below should always
2205  * be zero, i.e. there will never be conv-deadlk between two locks that are
2206  * both already on the convert queue.
2207  */
2208 
2209 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2210 {
2211 	struct dlm_lkb *lkb1;
2212 	int lkb_is_ahead = 0;
2213 
2214 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2215 		if (lkb1 == lkb2) {
2216 			lkb_is_ahead = 1;
2217 			continue;
2218 		}
2219 
2220 		if (!lkb_is_ahead) {
2221 			if (!modes_compat(lkb2, lkb1))
2222 				return 1;
2223 		} else {
2224 			if (!modes_compat(lkb2, lkb1) &&
2225 			    !modes_compat(lkb1, lkb2))
2226 				return 1;
2227 		}
2228 	}
2229 	return 0;
2230 }
2231 
2232 /*
2233  * Return 1 if the lock can be granted, 0 otherwise.
2234  * Also detect and resolve conversion deadlocks.
2235  *
2236  * lkb is the lock to be granted
2237  *
2238  * now is 1 if the function is being called in the context of the
2239  * immediate request, it is 0 if called later, after the lock has been
2240  * queued.
2241  *
2242  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2243  * after recovery.
2244  *
2245  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2246  */
2247 
2248 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2249 			   int recover)
2250 {
2251 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2252 
2253 	/*
2254 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2255 	 * a new request for a NL mode lock being blocked.
2256 	 *
2257 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2258 	 * request, then it would be granted.  In essence, the use of this flag
2259 	 * tells the Lock Manager to expedite theis request by not considering
2260 	 * what may be in the CONVERTING or WAITING queues...  As of this
2261 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2262 	 * mode locks.  This flag is not valid for conversion requests.
2263 	 *
2264 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2265 	 * conversion or used with a non-NL requested mode.  We also know an
2266 	 * EXPEDITE request is always granted immediately, so now must always
2267 	 * be 1.  The full condition to grant an expedite request: (now &&
2268 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2269 	 * therefore be shortened to just checking the flag.
2270 	 */
2271 
2272 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2273 		return 1;
2274 
2275 	/*
2276 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2277 	 * added to the remaining conditions.
2278 	 */
2279 
2280 	if (queue_conflict(&r->res_grantqueue, lkb))
2281 		return 0;
2282 
2283 	/*
2284 	 * 6-3: By default, a conversion request is immediately granted if the
2285 	 * requested mode is compatible with the modes of all other granted
2286 	 * locks
2287 	 */
2288 
2289 	if (queue_conflict(&r->res_convertqueue, lkb))
2290 		return 0;
2291 
2292 	/*
2293 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2294 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2295 	 * The lkb's may have been rebuilt on the queues in a different
2296 	 * order than they were in on the previous master.  So, granting
2297 	 * queued conversions in order after recovery doesn't make sense
2298 	 * since the order hasn't been preserved anyway.  The new order
2299 	 * could also have created a new "in place" conversion deadlock.
2300 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2301 	 * After recovery, there would be no granted locks, and possibly
2302 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2303 	 * recovery, grant conversions without considering order.
2304 	 */
2305 
2306 	if (conv && recover)
2307 		return 1;
2308 
2309 	/*
2310 	 * 6-5: But the default algorithm for deciding whether to grant or
2311 	 * queue conversion requests does not by itself guarantee that such
2312 	 * requests are serviced on a "first come first serve" basis.  This, in
2313 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2314 	 *
2315 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2316 	 * the system service employed to request a lock conversion.  This flag
2317 	 * forces certain conversion requests to be queued, even if they are
2318 	 * compatible with the granted modes of other locks on the same
2319 	 * resource.  Thus, the use of this flag results in conversion requests
2320 	 * being ordered on a "first come first servce" basis.
2321 	 *
2322 	 * DCT: This condition is all about new conversions being able to occur
2323 	 * "in place" while the lock remains on the granted queue (assuming
2324 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2325 	 * doesn't _have_ to go onto the convert queue where it's processed in
2326 	 * order.  The "now" variable is necessary to distinguish converts
2327 	 * being received and processed for the first time now, because once a
2328 	 * convert is moved to the conversion queue the condition below applies
2329 	 * requiring fifo granting.
2330 	 */
2331 
2332 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2333 		return 1;
2334 
2335 	/*
2336 	 * Even if the convert is compat with all granted locks,
2337 	 * QUECVT forces it behind other locks on the convert queue.
2338 	 */
2339 
2340 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2341 		if (list_empty(&r->res_convertqueue))
2342 			return 1;
2343 		else
2344 			return 0;
2345 	}
2346 
2347 	/*
2348 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2349 	 * order.
2350 	 */
2351 
2352 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2353 		return 1;
2354 
2355 	/*
2356 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2357 	 * granted until all other conversion requests ahead of it are granted
2358 	 * and/or canceled.
2359 	 */
2360 
2361 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2362 		return 1;
2363 
2364 	/*
2365 	 * 6-4: By default, a new request is immediately granted only if all
2366 	 * three of the following conditions are satisfied when the request is
2367 	 * issued:
2368 	 * - The queue of ungranted conversion requests for the resource is
2369 	 *   empty.
2370 	 * - The queue of ungranted new requests for the resource is empty.
2371 	 * - The mode of the new request is compatible with the most
2372 	 *   restrictive mode of all granted locks on the resource.
2373 	 */
2374 
2375 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2376 	    list_empty(&r->res_waitqueue))
2377 		return 1;
2378 
2379 	/*
2380 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2381 	 * it cannot be granted until the queue of ungranted conversion
2382 	 * requests is empty, all ungranted new requests ahead of it are
2383 	 * granted and/or canceled, and it is compatible with the granted mode
2384 	 * of the most restrictive lock granted on the resource.
2385 	 */
2386 
2387 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2388 	    first_in_list(lkb, &r->res_waitqueue))
2389 		return 1;
2390 
2391 	return 0;
2392 }
2393 
2394 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2395 			  int recover, int *err)
2396 {
2397 	int rv;
2398 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2399 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2400 
2401 	if (err)
2402 		*err = 0;
2403 
2404 	rv = _can_be_granted(r, lkb, now, recover);
2405 	if (rv)
2406 		goto out;
2407 
2408 	/*
2409 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2410 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2411 	 * cancels one of the locks.
2412 	 */
2413 
2414 	if (is_convert && can_be_queued(lkb) &&
2415 	    conversion_deadlock_detect(r, lkb)) {
2416 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2417 			lkb->lkb_grmode = DLM_LOCK_NL;
2418 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2419 		} else if (err) {
2420 			*err = -EDEADLK;
2421 		} else {
2422 			log_print("can_be_granted deadlock %x now %d",
2423 				  lkb->lkb_id, now);
2424 			dlm_dump_rsb(r);
2425 		}
2426 		goto out;
2427 	}
2428 
2429 	/*
2430 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2431 	 * to grant a request in a mode other than the normal rqmode.  It's a
2432 	 * simple way to provide a big optimization to applications that can
2433 	 * use them.
2434 	 */
2435 
2436 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2437 		alt = DLM_LOCK_PR;
2438 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2439 		alt = DLM_LOCK_CW;
2440 
2441 	if (alt) {
2442 		lkb->lkb_rqmode = alt;
2443 		rv = _can_be_granted(r, lkb, now, 0);
2444 		if (rv)
2445 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2446 		else
2447 			lkb->lkb_rqmode = rqmode;
2448 	}
2449  out:
2450 	return rv;
2451 }
2452 
2453 /* Returns the highest requested mode of all blocked conversions; sets
2454    cw if there's a blocked conversion to DLM_LOCK_CW. */
2455 
2456 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2457 				 unsigned int *count)
2458 {
2459 	struct dlm_lkb *lkb, *s;
2460 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2461 	int hi, demoted, quit, grant_restart, demote_restart;
2462 	int deadlk;
2463 
2464 	quit = 0;
2465  restart:
2466 	grant_restart = 0;
2467 	demote_restart = 0;
2468 	hi = DLM_LOCK_IV;
2469 
2470 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2471 		demoted = is_demoted(lkb);
2472 		deadlk = 0;
2473 
2474 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2475 			grant_lock_pending(r, lkb);
2476 			grant_restart = 1;
2477 			if (count)
2478 				(*count)++;
2479 			continue;
2480 		}
2481 
2482 		if (!demoted && is_demoted(lkb)) {
2483 			log_print("WARN: pending demoted %x node %d %s",
2484 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2485 			demote_restart = 1;
2486 			continue;
2487 		}
2488 
2489 		if (deadlk) {
2490 			/*
2491 			 * If DLM_LKB_NODLKWT flag is set and conversion
2492 			 * deadlock is detected, we request blocking AST and
2493 			 * down (or cancel) conversion.
2494 			 */
2495 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2496 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2497 					queue_bast(r, lkb, lkb->lkb_rqmode);
2498 					lkb->lkb_highbast = lkb->lkb_rqmode;
2499 				}
2500 			} else {
2501 				log_print("WARN: pending deadlock %x node %d %s",
2502 					  lkb->lkb_id, lkb->lkb_nodeid,
2503 					  r->res_name);
2504 				dlm_dump_rsb(r);
2505 			}
2506 			continue;
2507 		}
2508 
2509 		hi = max_t(int, lkb->lkb_rqmode, hi);
2510 
2511 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2512 			*cw = 1;
2513 	}
2514 
2515 	if (grant_restart)
2516 		goto restart;
2517 	if (demote_restart && !quit) {
2518 		quit = 1;
2519 		goto restart;
2520 	}
2521 
2522 	return max_t(int, high, hi);
2523 }
2524 
2525 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2526 			      unsigned int *count)
2527 {
2528 	struct dlm_lkb *lkb, *s;
2529 
2530 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2531 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2532 			grant_lock_pending(r, lkb);
2533 			if (count)
2534 				(*count)++;
2535 		} else {
2536 			high = max_t(int, lkb->lkb_rqmode, high);
2537 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2538 				*cw = 1;
2539 		}
2540 	}
2541 
2542 	return high;
2543 }
2544 
2545 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2546    on either the convert or waiting queue.
2547    high is the largest rqmode of all locks blocked on the convert or
2548    waiting queue. */
2549 
2550 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2551 {
2552 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2553 		if (gr->lkb_highbast < DLM_LOCK_EX)
2554 			return 1;
2555 		return 0;
2556 	}
2557 
2558 	if (gr->lkb_highbast < high &&
2559 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2560 		return 1;
2561 	return 0;
2562 }
2563 
2564 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2565 {
2566 	struct dlm_lkb *lkb, *s;
2567 	int high = DLM_LOCK_IV;
2568 	int cw = 0;
2569 
2570 	if (!is_master(r)) {
2571 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2572 		dlm_dump_rsb(r);
2573 		return;
2574 	}
2575 
2576 	high = grant_pending_convert(r, high, &cw, count);
2577 	high = grant_pending_wait(r, high, &cw, count);
2578 
2579 	if (high == DLM_LOCK_IV)
2580 		return;
2581 
2582 	/*
2583 	 * If there are locks left on the wait/convert queue then send blocking
2584 	 * ASTs to granted locks based on the largest requested mode (high)
2585 	 * found above.
2586 	 */
2587 
2588 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2589 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2590 			if (cw && high == DLM_LOCK_PR &&
2591 			    lkb->lkb_grmode == DLM_LOCK_PR)
2592 				queue_bast(r, lkb, DLM_LOCK_CW);
2593 			else
2594 				queue_bast(r, lkb, high);
2595 			lkb->lkb_highbast = high;
2596 		}
2597 	}
2598 }
2599 
2600 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2601 {
2602 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2603 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2604 		if (gr->lkb_highbast < DLM_LOCK_EX)
2605 			return 1;
2606 		return 0;
2607 	}
2608 
2609 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2610 		return 1;
2611 	return 0;
2612 }
2613 
2614 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2615 			    struct dlm_lkb *lkb)
2616 {
2617 	struct dlm_lkb *gr;
2618 
2619 	list_for_each_entry(gr, head, lkb_statequeue) {
2620 		/* skip self when sending basts to convertqueue */
2621 		if (gr == lkb)
2622 			continue;
2623 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2624 			queue_bast(r, gr, lkb->lkb_rqmode);
2625 			gr->lkb_highbast = lkb->lkb_rqmode;
2626 		}
2627 	}
2628 }
2629 
2630 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631 {
2632 	send_bast_queue(r, &r->res_grantqueue, lkb);
2633 }
2634 
2635 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2636 {
2637 	send_bast_queue(r, &r->res_grantqueue, lkb);
2638 	send_bast_queue(r, &r->res_convertqueue, lkb);
2639 }
2640 
2641 /* set_master(r, lkb) -- set the master nodeid of a resource
2642 
2643    The purpose of this function is to set the nodeid field in the given
2644    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2645    known, it can just be copied to the lkb and the function will return
2646    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2647    before it can be copied to the lkb.
2648 
2649    When the rsb nodeid is being looked up remotely, the initial lkb
2650    causing the lookup is kept on the ls_waiters list waiting for the
2651    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2652    on the rsb's res_lookup list until the master is verified.
2653 
2654    Return values:
2655    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2656    1: the rsb master is not available and the lkb has been placed on
2657       a wait queue
2658 */
2659 
2660 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2661 {
2662 	int our_nodeid = dlm_our_nodeid();
2663 
2664 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2665 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2666 		r->res_first_lkid = lkb->lkb_id;
2667 		lkb->lkb_nodeid = r->res_nodeid;
2668 		return 0;
2669 	}
2670 
2671 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2672 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2673 		return 1;
2674 	}
2675 
2676 	if (r->res_master_nodeid == our_nodeid) {
2677 		lkb->lkb_nodeid = 0;
2678 		return 0;
2679 	}
2680 
2681 	if (r->res_master_nodeid) {
2682 		lkb->lkb_nodeid = r->res_master_nodeid;
2683 		return 0;
2684 	}
2685 
2686 	if (dlm_dir_nodeid(r) == our_nodeid) {
2687 		/* This is a somewhat unusual case; find_rsb will usually
2688 		   have set res_master_nodeid when dir nodeid is local, but
2689 		   there are cases where we become the dir node after we've
2690 		   past find_rsb and go through _request_lock again.
2691 		   confirm_master() or process_lookup_list() needs to be
2692 		   called after this. */
2693 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2694 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2695 			  r->res_name);
2696 		r->res_master_nodeid = our_nodeid;
2697 		r->res_nodeid = 0;
2698 		lkb->lkb_nodeid = 0;
2699 		return 0;
2700 	}
2701 
2702 	r->res_first_lkid = lkb->lkb_id;
2703 	send_lookup(r, lkb);
2704 	return 1;
2705 }
2706 
2707 static void process_lookup_list(struct dlm_rsb *r)
2708 {
2709 	struct dlm_lkb *lkb, *safe;
2710 
2711 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2712 		list_del_init(&lkb->lkb_rsb_lookup);
2713 		_request_lock(r, lkb);
2714 	}
2715 }
2716 
2717 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2718 
2719 static void confirm_master(struct dlm_rsb *r, int error)
2720 {
2721 	struct dlm_lkb *lkb;
2722 
2723 	if (!r->res_first_lkid)
2724 		return;
2725 
2726 	switch (error) {
2727 	case 0:
2728 	case -EINPROGRESS:
2729 		r->res_first_lkid = 0;
2730 		process_lookup_list(r);
2731 		break;
2732 
2733 	case -EAGAIN:
2734 	case -EBADR:
2735 	case -ENOTBLK:
2736 		/* the remote request failed and won't be retried (it was
2737 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2738 		   lkb the first_lkid */
2739 
2740 		r->res_first_lkid = 0;
2741 
2742 		if (!list_empty(&r->res_lookup)) {
2743 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2744 					 lkb_rsb_lookup);
2745 			list_del_init(&lkb->lkb_rsb_lookup);
2746 			r->res_first_lkid = lkb->lkb_id;
2747 			_request_lock(r, lkb);
2748 		}
2749 		break;
2750 
2751 	default:
2752 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2753 	}
2754 }
2755 
2756 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2757 			 int namelen, void (*ast)(void *astparam),
2758 			 void *astparam,
2759 			 void (*bast)(void *astparam, int mode),
2760 			 struct dlm_args *args)
2761 {
2762 	int rv = -EINVAL;
2763 
2764 	/* check for invalid arg usage */
2765 
2766 	if (mode < 0 || mode > DLM_LOCK_EX)
2767 		goto out;
2768 
2769 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2770 		goto out;
2771 
2772 	if (flags & DLM_LKF_CANCEL)
2773 		goto out;
2774 
2775 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2776 		goto out;
2777 
2778 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2779 		goto out;
2780 
2781 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2782 		goto out;
2783 
2784 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2785 		goto out;
2786 
2787 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2788 		goto out;
2789 
2790 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2791 		goto out;
2792 
2793 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2794 		goto out;
2795 
2796 	if (!ast || !lksb)
2797 		goto out;
2798 
2799 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2800 		goto out;
2801 
2802 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2803 		goto out;
2804 
2805 	/* these args will be copied to the lkb in validate_lock_args,
2806 	   it cannot be done now because when converting locks, fields in
2807 	   an active lkb cannot be modified before locking the rsb */
2808 
2809 	args->flags = flags;
2810 	args->astfn = ast;
2811 	args->astparam = astparam;
2812 	args->bastfn = bast;
2813 	args->mode = mode;
2814 	args->lksb = lksb;
2815 	rv = 0;
2816  out:
2817 	return rv;
2818 }
2819 
2820 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2821 {
2822 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2823  		      DLM_LKF_FORCEUNLOCK))
2824 		return -EINVAL;
2825 
2826 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2827 		return -EINVAL;
2828 
2829 	args->flags = flags;
2830 	args->astparam = astarg;
2831 	return 0;
2832 }
2833 
2834 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2835 			      struct dlm_args *args)
2836 {
2837 	int rv = -EBUSY;
2838 
2839 	if (args->flags & DLM_LKF_CONVERT) {
2840 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2841 			goto out;
2842 
2843 		/* lock not allowed if there's any op in progress */
2844 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2845 			goto out;
2846 
2847 		if (is_overlap(lkb))
2848 			goto out;
2849 
2850 		rv = -EINVAL;
2851 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2852 			goto out;
2853 
2854 		if (args->flags & DLM_LKF_QUECVT &&
2855 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2856 			goto out;
2857 	}
2858 
2859 	lkb->lkb_exflags = args->flags;
2860 	dlm_set_sbflags_val(lkb, 0);
2861 	lkb->lkb_astfn = args->astfn;
2862 	lkb->lkb_astparam = args->astparam;
2863 	lkb->lkb_bastfn = args->bastfn;
2864 	lkb->lkb_rqmode = args->mode;
2865 	lkb->lkb_lksb = args->lksb;
2866 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2867 	lkb->lkb_ownpid = (int) current->pid;
2868 	rv = 0;
2869  out:
2870 	switch (rv) {
2871 	case 0:
2872 		break;
2873 	case -EINVAL:
2874 		/* annoy the user because dlm usage is wrong */
2875 		WARN_ON(1);
2876 		log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2877 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878 			  lkb->lkb_status, lkb->lkb_wait_type,
2879 			  lkb->lkb_resource->res_name);
2880 		break;
2881 	default:
2882 		log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2883 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2884 			  lkb->lkb_status, lkb->lkb_wait_type,
2885 			  lkb->lkb_resource->res_name);
2886 		break;
2887 	}
2888 
2889 	return rv;
2890 }
2891 
2892 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2893    for success */
2894 
2895 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2896    because there may be a lookup in progress and it's valid to do
2897    cancel/unlockf on it */
2898 
2899 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2900 {
2901 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2902 	int rv = -EBUSY;
2903 
2904 	/* normal unlock not allowed if there's any op in progress */
2905 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2906 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2907 		goto out;
2908 
2909 	/* an lkb may be waiting for an rsb lookup to complete where the
2910 	   lookup was initiated by another lock */
2911 
2912 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2913 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2914 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2915 			list_del_init(&lkb->lkb_rsb_lookup);
2916 			queue_cast(lkb->lkb_resource, lkb,
2917 				   args->flags & DLM_LKF_CANCEL ?
2918 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2919 			unhold_lkb(lkb); /* undoes create_lkb() */
2920 		}
2921 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2922 		goto out;
2923 	}
2924 
2925 	rv = -EINVAL;
2926 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2927 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2928 		dlm_print_lkb(lkb);
2929 		goto out;
2930 	}
2931 
2932 	/* an lkb may still exist even though the lock is EOL'ed due to a
2933 	 * cancel, unlock or failed noqueue request; an app can't use these
2934 	 * locks; return same error as if the lkid had not been found at all
2935 	 */
2936 
2937 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2938 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2939 		rv = -ENOENT;
2940 		goto out;
2941 	}
2942 
2943 	/* cancel not allowed with another cancel/unlock in progress */
2944 
2945 	if (args->flags & DLM_LKF_CANCEL) {
2946 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2947 			goto out;
2948 
2949 		if (is_overlap(lkb))
2950 			goto out;
2951 
2952 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2953 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2954 			rv = -EBUSY;
2955 			goto out;
2956 		}
2957 
2958 		/* there's nothing to cancel */
2959 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2960 		    !lkb->lkb_wait_type) {
2961 			rv = -EBUSY;
2962 			goto out;
2963 		}
2964 
2965 		switch (lkb->lkb_wait_type) {
2966 		case DLM_MSG_LOOKUP:
2967 		case DLM_MSG_REQUEST:
2968 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2969 			rv = -EBUSY;
2970 			goto out;
2971 		case DLM_MSG_UNLOCK:
2972 		case DLM_MSG_CANCEL:
2973 			goto out;
2974 		}
2975 		/* add_to_waiters() will set OVERLAP_CANCEL */
2976 		goto out_ok;
2977 	}
2978 
2979 	/* do we need to allow a force-unlock if there's a normal unlock
2980 	   already in progress?  in what conditions could the normal unlock
2981 	   fail such that we'd want to send a force-unlock to be sure? */
2982 
2983 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2984 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2985 			goto out;
2986 
2987 		if (is_overlap_unlock(lkb))
2988 			goto out;
2989 
2990 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2991 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2992 			rv = -EBUSY;
2993 			goto out;
2994 		}
2995 
2996 		switch (lkb->lkb_wait_type) {
2997 		case DLM_MSG_LOOKUP:
2998 		case DLM_MSG_REQUEST:
2999 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
3000 			rv = -EBUSY;
3001 			goto out;
3002 		case DLM_MSG_UNLOCK:
3003 			goto out;
3004 		}
3005 		/* add_to_waiters() will set OVERLAP_UNLOCK */
3006 	}
3007 
3008  out_ok:
3009 	/* an overlapping op shouldn't blow away exflags from other op */
3010 	lkb->lkb_exflags |= args->flags;
3011 	dlm_set_sbflags_val(lkb, 0);
3012 	lkb->lkb_astparam = args->astparam;
3013 	rv = 0;
3014  out:
3015 	switch (rv) {
3016 	case 0:
3017 		break;
3018 	case -EINVAL:
3019 		/* annoy the user because dlm usage is wrong */
3020 		WARN_ON(1);
3021 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3022 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3023 			  args->flags, lkb->lkb_wait_type,
3024 			  lkb->lkb_resource->res_name);
3025 		break;
3026 	default:
3027 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3028 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3029 			  args->flags, lkb->lkb_wait_type,
3030 			  lkb->lkb_resource->res_name);
3031 		break;
3032 	}
3033 
3034 	return rv;
3035 }
3036 
3037 /*
3038  * Four stage 4 varieties:
3039  * do_request(), do_convert(), do_unlock(), do_cancel()
3040  * These are called on the master node for the given lock and
3041  * from the central locking logic.
3042  */
3043 
3044 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3045 {
3046 	int error = 0;
3047 
3048 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3049 		grant_lock(r, lkb);
3050 		queue_cast(r, lkb, 0);
3051 		goto out;
3052 	}
3053 
3054 	if (can_be_queued(lkb)) {
3055 		error = -EINPROGRESS;
3056 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3057 		goto out;
3058 	}
3059 
3060 	error = -EAGAIN;
3061 	queue_cast(r, lkb, -EAGAIN);
3062  out:
3063 	return error;
3064 }
3065 
3066 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3067 			       int error)
3068 {
3069 	switch (error) {
3070 	case -EAGAIN:
3071 		if (force_blocking_asts(lkb))
3072 			send_blocking_asts_all(r, lkb);
3073 		break;
3074 	case -EINPROGRESS:
3075 		send_blocking_asts(r, lkb);
3076 		break;
3077 	}
3078 }
3079 
3080 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3081 {
3082 	int error = 0;
3083 	int deadlk = 0;
3084 
3085 	/* changing an existing lock may allow others to be granted */
3086 
3087 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3088 		grant_lock(r, lkb);
3089 		queue_cast(r, lkb, 0);
3090 		goto out;
3091 	}
3092 
3093 	/* can_be_granted() detected that this lock would block in a conversion
3094 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3095 	   the ast for the convert. */
3096 
3097 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3098 		/* it's left on the granted queue */
3099 		revert_lock(r, lkb);
3100 		queue_cast(r, lkb, -EDEADLK);
3101 		error = -EDEADLK;
3102 		goto out;
3103 	}
3104 
3105 	/* is_demoted() means the can_be_granted() above set the grmode
3106 	   to NL, and left us on the granted queue.  This auto-demotion
3107 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3108 	   now grantable.  We have to try to grant other converting locks
3109 	   before we try again to grant this one. */
3110 
3111 	if (is_demoted(lkb)) {
3112 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3113 		if (_can_be_granted(r, lkb, 1, 0)) {
3114 			grant_lock(r, lkb);
3115 			queue_cast(r, lkb, 0);
3116 			goto out;
3117 		}
3118 		/* else fall through and move to convert queue */
3119 	}
3120 
3121 	if (can_be_queued(lkb)) {
3122 		error = -EINPROGRESS;
3123 		del_lkb(r, lkb);
3124 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3125 		goto out;
3126 	}
3127 
3128 	error = -EAGAIN;
3129 	queue_cast(r, lkb, -EAGAIN);
3130  out:
3131 	return error;
3132 }
3133 
3134 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3135 			       int error)
3136 {
3137 	switch (error) {
3138 	case 0:
3139 		grant_pending_locks(r, NULL);
3140 		/* grant_pending_locks also sends basts */
3141 		break;
3142 	case -EAGAIN:
3143 		if (force_blocking_asts(lkb))
3144 			send_blocking_asts_all(r, lkb);
3145 		break;
3146 	case -EINPROGRESS:
3147 		send_blocking_asts(r, lkb);
3148 		break;
3149 	}
3150 }
3151 
3152 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3153 {
3154 	remove_lock(r, lkb);
3155 	queue_cast(r, lkb, -DLM_EUNLOCK);
3156 	return -DLM_EUNLOCK;
3157 }
3158 
3159 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3160 			      int error)
3161 {
3162 	grant_pending_locks(r, NULL);
3163 }
3164 
3165 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3166 
3167 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3168 {
3169 	int error;
3170 
3171 	error = revert_lock(r, lkb);
3172 	if (error) {
3173 		queue_cast(r, lkb, -DLM_ECANCEL);
3174 		return -DLM_ECANCEL;
3175 	}
3176 	return 0;
3177 }
3178 
3179 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3180 			      int error)
3181 {
3182 	if (error)
3183 		grant_pending_locks(r, NULL);
3184 }
3185 
3186 /*
3187  * Four stage 3 varieties:
3188  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3189  */
3190 
3191 /* add a new lkb to a possibly new rsb, called by requesting process */
3192 
3193 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3194 {
3195 	int error;
3196 
3197 	/* set_master: sets lkb nodeid from r */
3198 
3199 	error = set_master(r, lkb);
3200 	if (error < 0)
3201 		goto out;
3202 	if (error) {
3203 		error = 0;
3204 		goto out;
3205 	}
3206 
3207 	if (is_remote(r)) {
3208 		/* receive_request() calls do_request() on remote node */
3209 		error = send_request(r, lkb);
3210 	} else {
3211 		error = do_request(r, lkb);
3212 		/* for remote locks the request_reply is sent
3213 		   between do_request and do_request_effects */
3214 		do_request_effects(r, lkb, error);
3215 	}
3216  out:
3217 	return error;
3218 }
3219 
3220 /* change some property of an existing lkb, e.g. mode */
3221 
3222 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3223 {
3224 	int error;
3225 
3226 	if (is_remote(r)) {
3227 		/* receive_convert() calls do_convert() on remote node */
3228 		error = send_convert(r, lkb);
3229 	} else {
3230 		error = do_convert(r, lkb);
3231 		/* for remote locks the convert_reply is sent
3232 		   between do_convert and do_convert_effects */
3233 		do_convert_effects(r, lkb, error);
3234 	}
3235 
3236 	return error;
3237 }
3238 
3239 /* remove an existing lkb from the granted queue */
3240 
3241 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3242 {
3243 	int error;
3244 
3245 	if (is_remote(r)) {
3246 		/* receive_unlock() calls do_unlock() on remote node */
3247 		error = send_unlock(r, lkb);
3248 	} else {
3249 		error = do_unlock(r, lkb);
3250 		/* for remote locks the unlock_reply is sent
3251 		   between do_unlock and do_unlock_effects */
3252 		do_unlock_effects(r, lkb, error);
3253 	}
3254 
3255 	return error;
3256 }
3257 
3258 /* remove an existing lkb from the convert or wait queue */
3259 
3260 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3261 {
3262 	int error;
3263 
3264 	if (is_remote(r)) {
3265 		/* receive_cancel() calls do_cancel() on remote node */
3266 		error = send_cancel(r, lkb);
3267 	} else {
3268 		error = do_cancel(r, lkb);
3269 		/* for remote locks the cancel_reply is sent
3270 		   between do_cancel and do_cancel_effects */
3271 		do_cancel_effects(r, lkb, error);
3272 	}
3273 
3274 	return error;
3275 }
3276 
3277 /*
3278  * Four stage 2 varieties:
3279  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3280  */
3281 
3282 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3283 			const void *name, int len,
3284 			struct dlm_args *args)
3285 {
3286 	struct dlm_rsb *r;
3287 	int error;
3288 
3289 	error = validate_lock_args(ls, lkb, args);
3290 	if (error)
3291 		return error;
3292 
3293 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3294 	if (error)
3295 		return error;
3296 
3297 	lock_rsb(r);
3298 
3299 	attach_lkb(r, lkb);
3300 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3301 
3302 	error = _request_lock(r, lkb);
3303 
3304 	unlock_rsb(r);
3305 	put_rsb(r);
3306 	return error;
3307 }
3308 
3309 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3310 			struct dlm_args *args)
3311 {
3312 	struct dlm_rsb *r;
3313 	int error;
3314 
3315 	r = lkb->lkb_resource;
3316 
3317 	hold_rsb(r);
3318 	lock_rsb(r);
3319 
3320 	error = validate_lock_args(ls, lkb, args);
3321 	if (error)
3322 		goto out;
3323 
3324 	error = _convert_lock(r, lkb);
3325  out:
3326 	unlock_rsb(r);
3327 	put_rsb(r);
3328 	return error;
3329 }
3330 
3331 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3332 		       struct dlm_args *args)
3333 {
3334 	struct dlm_rsb *r;
3335 	int error;
3336 
3337 	r = lkb->lkb_resource;
3338 
3339 	hold_rsb(r);
3340 	lock_rsb(r);
3341 
3342 	error = validate_unlock_args(lkb, args);
3343 	if (error)
3344 		goto out;
3345 
3346 	error = _unlock_lock(r, lkb);
3347  out:
3348 	unlock_rsb(r);
3349 	put_rsb(r);
3350 	return error;
3351 }
3352 
3353 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3354 		       struct dlm_args *args)
3355 {
3356 	struct dlm_rsb *r;
3357 	int error;
3358 
3359 	r = lkb->lkb_resource;
3360 
3361 	hold_rsb(r);
3362 	lock_rsb(r);
3363 
3364 	error = validate_unlock_args(lkb, args);
3365 	if (error)
3366 		goto out;
3367 
3368 	error = _cancel_lock(r, lkb);
3369  out:
3370 	unlock_rsb(r);
3371 	put_rsb(r);
3372 	return error;
3373 }
3374 
3375 /*
3376  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3377  */
3378 
3379 int dlm_lock(dlm_lockspace_t *lockspace,
3380 	     int mode,
3381 	     struct dlm_lksb *lksb,
3382 	     uint32_t flags,
3383 	     const void *name,
3384 	     unsigned int namelen,
3385 	     uint32_t parent_lkid,
3386 	     void (*ast) (void *astarg),
3387 	     void *astarg,
3388 	     void (*bast) (void *astarg, int mode))
3389 {
3390 	struct dlm_ls *ls;
3391 	struct dlm_lkb *lkb;
3392 	struct dlm_args args;
3393 	int error, convert = flags & DLM_LKF_CONVERT;
3394 
3395 	ls = dlm_find_lockspace_local(lockspace);
3396 	if (!ls)
3397 		return -EINVAL;
3398 
3399 	dlm_lock_recovery(ls);
3400 
3401 	if (convert)
3402 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3403 	else
3404 		error = create_lkb(ls, &lkb);
3405 
3406 	if (error)
3407 		goto out;
3408 
3409 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3410 
3411 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3412 			      &args);
3413 	if (error)
3414 		goto out_put;
3415 
3416 	if (convert)
3417 		error = convert_lock(ls, lkb, &args);
3418 	else
3419 		error = request_lock(ls, lkb, name, namelen, &args);
3420 
3421 	if (error == -EINPROGRESS)
3422 		error = 0;
3423  out_put:
3424 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3425 
3426 	if (convert || error)
3427 		__put_lkb(ls, lkb);
3428 	if (error == -EAGAIN || error == -EDEADLK)
3429 		error = 0;
3430  out:
3431 	dlm_unlock_recovery(ls);
3432 	dlm_put_lockspace(ls);
3433 	return error;
3434 }
3435 
3436 int dlm_unlock(dlm_lockspace_t *lockspace,
3437 	       uint32_t lkid,
3438 	       uint32_t flags,
3439 	       struct dlm_lksb *lksb,
3440 	       void *astarg)
3441 {
3442 	struct dlm_ls *ls;
3443 	struct dlm_lkb *lkb;
3444 	struct dlm_args args;
3445 	int error;
3446 
3447 	ls = dlm_find_lockspace_local(lockspace);
3448 	if (!ls)
3449 		return -EINVAL;
3450 
3451 	dlm_lock_recovery(ls);
3452 
3453 	error = find_lkb(ls, lkid, &lkb);
3454 	if (error)
3455 		goto out;
3456 
3457 	trace_dlm_unlock_start(ls, lkb, flags);
3458 
3459 	error = set_unlock_args(flags, astarg, &args);
3460 	if (error)
3461 		goto out_put;
3462 
3463 	if (flags & DLM_LKF_CANCEL)
3464 		error = cancel_lock(ls, lkb, &args);
3465 	else
3466 		error = unlock_lock(ls, lkb, &args);
3467 
3468 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3469 		error = 0;
3470 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3471 		error = 0;
3472  out_put:
3473 	trace_dlm_unlock_end(ls, lkb, flags, error);
3474 
3475 	dlm_put_lkb(lkb);
3476  out:
3477 	dlm_unlock_recovery(ls);
3478 	dlm_put_lockspace(ls);
3479 	return error;
3480 }
3481 
3482 /*
3483  * send/receive routines for remote operations and replies
3484  *
3485  * send_args
3486  * send_common
3487  * send_request			receive_request
3488  * send_convert			receive_convert
3489  * send_unlock			receive_unlock
3490  * send_cancel			receive_cancel
3491  * send_grant			receive_grant
3492  * send_bast			receive_bast
3493  * send_lookup			receive_lookup
3494  * send_remove			receive_remove
3495  *
3496  * 				send_common_reply
3497  * receive_request_reply	send_request_reply
3498  * receive_convert_reply	send_convert_reply
3499  * receive_unlock_reply		send_unlock_reply
3500  * receive_cancel_reply		send_cancel_reply
3501  * receive_lookup_reply		send_lookup_reply
3502  */
3503 
3504 static int _create_message(struct dlm_ls *ls, int mb_len,
3505 			   int to_nodeid, int mstype,
3506 			   struct dlm_message **ms_ret,
3507 			   struct dlm_mhandle **mh_ret)
3508 {
3509 	struct dlm_message *ms;
3510 	struct dlm_mhandle *mh;
3511 	char *mb;
3512 
3513 	/* get_buffer gives us a message handle (mh) that we need to
3514 	   pass into midcomms_commit and a message buffer (mb) that we
3515 	   write our data into */
3516 
3517 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3518 	if (!mh)
3519 		return -ENOBUFS;
3520 
3521 	ms = (struct dlm_message *) mb;
3522 
3523 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3524 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3525 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3526 	ms->m_header.h_length = cpu_to_le16(mb_len);
3527 	ms->m_header.h_cmd = DLM_MSG;
3528 
3529 	ms->m_type = cpu_to_le32(mstype);
3530 
3531 	*mh_ret = mh;
3532 	*ms_ret = ms;
3533 	return 0;
3534 }
3535 
3536 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3537 			  int to_nodeid, int mstype,
3538 			  struct dlm_message **ms_ret,
3539 			  struct dlm_mhandle **mh_ret)
3540 {
3541 	int mb_len = sizeof(struct dlm_message);
3542 
3543 	switch (mstype) {
3544 	case DLM_MSG_REQUEST:
3545 	case DLM_MSG_LOOKUP:
3546 	case DLM_MSG_REMOVE:
3547 		mb_len += r->res_length;
3548 		break;
3549 	case DLM_MSG_CONVERT:
3550 	case DLM_MSG_UNLOCK:
3551 	case DLM_MSG_REQUEST_REPLY:
3552 	case DLM_MSG_CONVERT_REPLY:
3553 	case DLM_MSG_GRANT:
3554 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3555 			mb_len += r->res_ls->ls_lvblen;
3556 		break;
3557 	}
3558 
3559 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3560 			       ms_ret, mh_ret);
3561 }
3562 
3563 /* further lowcomms enhancements or alternate implementations may make
3564    the return value from this function useful at some point */
3565 
3566 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3567 			const void *name, int namelen)
3568 {
3569 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3570 	return 0;
3571 }
3572 
3573 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3574 		      struct dlm_message *ms)
3575 {
3576 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3577 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3578 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3579 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3580 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3581 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3582 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3583 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3584 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3585 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3586 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3587 	ms->m_hash     = cpu_to_le32(r->res_hash);
3588 
3589 	/* m_result and m_bastmode are set from function args,
3590 	   not from lkb fields */
3591 
3592 	if (lkb->lkb_bastfn)
3593 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3594 	if (lkb->lkb_astfn)
3595 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3596 
3597 	/* compare with switch in create_message; send_remove() doesn't
3598 	   use send_args() */
3599 
3600 	switch (ms->m_type) {
3601 	case cpu_to_le32(DLM_MSG_REQUEST):
3602 	case cpu_to_le32(DLM_MSG_LOOKUP):
3603 		memcpy(ms->m_extra, r->res_name, r->res_length);
3604 		break;
3605 	case cpu_to_le32(DLM_MSG_CONVERT):
3606 	case cpu_to_le32(DLM_MSG_UNLOCK):
3607 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3608 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3609 	case cpu_to_le32(DLM_MSG_GRANT):
3610 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3611 			break;
3612 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3613 		break;
3614 	}
3615 }
3616 
3617 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3618 {
3619 	struct dlm_message *ms;
3620 	struct dlm_mhandle *mh;
3621 	int to_nodeid, error;
3622 
3623 	to_nodeid = r->res_nodeid;
3624 
3625 	error = add_to_waiters(lkb, mstype, to_nodeid);
3626 	if (error)
3627 		return error;
3628 
3629 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3630 	if (error)
3631 		goto fail;
3632 
3633 	send_args(r, lkb, ms);
3634 
3635 	error = send_message(mh, ms, r->res_name, r->res_length);
3636 	if (error)
3637 		goto fail;
3638 	return 0;
3639 
3640  fail:
3641 	remove_from_waiters(lkb, msg_reply_type(mstype));
3642 	return error;
3643 }
3644 
3645 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 {
3647 	return send_common(r, lkb, DLM_MSG_REQUEST);
3648 }
3649 
3650 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3651 {
3652 	int error;
3653 
3654 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3655 
3656 	/* down conversions go without a reply from the master */
3657 	if (!error && down_conversion(lkb)) {
3658 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3659 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3660 		r->res_ls->ls_local_ms.m_result = 0;
3661 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3662 	}
3663 
3664 	return error;
3665 }
3666 
3667 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3668    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3669    that the master is still correct. */
3670 
3671 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3674 }
3675 
3676 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3677 {
3678 	return send_common(r, lkb, DLM_MSG_CANCEL);
3679 }
3680 
3681 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3682 {
3683 	struct dlm_message *ms;
3684 	struct dlm_mhandle *mh;
3685 	int to_nodeid, error;
3686 
3687 	to_nodeid = lkb->lkb_nodeid;
3688 
3689 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3690 	if (error)
3691 		goto out;
3692 
3693 	send_args(r, lkb, ms);
3694 
3695 	ms->m_result = 0;
3696 
3697 	error = send_message(mh, ms, r->res_name, r->res_length);
3698  out:
3699 	return error;
3700 }
3701 
3702 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3703 {
3704 	struct dlm_message *ms;
3705 	struct dlm_mhandle *mh;
3706 	int to_nodeid, error;
3707 
3708 	to_nodeid = lkb->lkb_nodeid;
3709 
3710 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3711 	if (error)
3712 		goto out;
3713 
3714 	send_args(r, lkb, ms);
3715 
3716 	ms->m_bastmode = cpu_to_le32(mode);
3717 
3718 	error = send_message(mh, ms, r->res_name, r->res_length);
3719  out:
3720 	return error;
3721 }
3722 
3723 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3724 {
3725 	struct dlm_message *ms;
3726 	struct dlm_mhandle *mh;
3727 	int to_nodeid, error;
3728 
3729 	to_nodeid = dlm_dir_nodeid(r);
3730 
3731 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3732 	if (error)
3733 		return error;
3734 
3735 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3736 	if (error)
3737 		goto fail;
3738 
3739 	send_args(r, lkb, ms);
3740 
3741 	error = send_message(mh, ms, r->res_name, r->res_length);
3742 	if (error)
3743 		goto fail;
3744 	return 0;
3745 
3746  fail:
3747 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3748 	return error;
3749 }
3750 
3751 static int send_remove(struct dlm_rsb *r)
3752 {
3753 	struct dlm_message *ms;
3754 	struct dlm_mhandle *mh;
3755 	int to_nodeid, error;
3756 
3757 	to_nodeid = dlm_dir_nodeid(r);
3758 
3759 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3760 	if (error)
3761 		goto out;
3762 
3763 	memcpy(ms->m_extra, r->res_name, r->res_length);
3764 	ms->m_hash = cpu_to_le32(r->res_hash);
3765 
3766 	error = send_message(mh, ms, r->res_name, r->res_length);
3767  out:
3768 	return error;
3769 }
3770 
3771 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3772 			     int mstype, int rv)
3773 {
3774 	struct dlm_message *ms;
3775 	struct dlm_mhandle *mh;
3776 	int to_nodeid, error;
3777 
3778 	to_nodeid = lkb->lkb_nodeid;
3779 
3780 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3781 	if (error)
3782 		goto out;
3783 
3784 	send_args(r, lkb, ms);
3785 
3786 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3787 
3788 	error = send_message(mh, ms, r->res_name, r->res_length);
3789  out:
3790 	return error;
3791 }
3792 
3793 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3794 {
3795 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3796 }
3797 
3798 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3799 {
3800 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3801 }
3802 
3803 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3804 {
3805 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3806 }
3807 
3808 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3809 {
3810 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3811 }
3812 
3813 static int send_lookup_reply(struct dlm_ls *ls,
3814 			     const struct dlm_message *ms_in, int ret_nodeid,
3815 			     int rv)
3816 {
3817 	struct dlm_rsb *r = &ls->ls_local_rsb;
3818 	struct dlm_message *ms;
3819 	struct dlm_mhandle *mh;
3820 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3821 
3822 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3823 	if (error)
3824 		goto out;
3825 
3826 	ms->m_lkid = ms_in->m_lkid;
3827 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3828 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3829 
3830 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3831  out:
3832 	return error;
3833 }
3834 
3835 /* which args we save from a received message depends heavily on the type
3836    of message, unlike the send side where we can safely send everything about
3837    the lkb for any type of message */
3838 
3839 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3840 {
3841 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3842 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3843 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3844 }
3845 
3846 static void receive_flags_reply(struct dlm_lkb *lkb,
3847 				const struct dlm_message *ms,
3848 				bool local)
3849 {
3850 	if (local)
3851 		return;
3852 
3853 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3854 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3855 }
3856 
3857 static int receive_extralen(const struct dlm_message *ms)
3858 {
3859 	return (le16_to_cpu(ms->m_header.h_length) -
3860 		sizeof(struct dlm_message));
3861 }
3862 
3863 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3864 		       const struct dlm_message *ms)
3865 {
3866 	int len;
3867 
3868 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3869 		if (!lkb->lkb_lvbptr)
3870 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3871 		if (!lkb->lkb_lvbptr)
3872 			return -ENOMEM;
3873 		len = receive_extralen(ms);
3874 		if (len > ls->ls_lvblen)
3875 			len = ls->ls_lvblen;
3876 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3877 	}
3878 	return 0;
3879 }
3880 
3881 static void fake_bastfn(void *astparam, int mode)
3882 {
3883 	log_print("fake_bastfn should not be called");
3884 }
3885 
3886 static void fake_astfn(void *astparam)
3887 {
3888 	log_print("fake_astfn should not be called");
3889 }
3890 
3891 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3892 				const struct dlm_message *ms)
3893 {
3894 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3895 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3896 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3897 	lkb->lkb_grmode = DLM_LOCK_IV;
3898 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3899 
3900 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3901 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3902 
3903 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3904 		/* lkb was just created so there won't be an lvb yet */
3905 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3906 		if (!lkb->lkb_lvbptr)
3907 			return -ENOMEM;
3908 	}
3909 
3910 	return 0;
3911 }
3912 
3913 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3914 				const struct dlm_message *ms)
3915 {
3916 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3917 		return -EBUSY;
3918 
3919 	if (receive_lvb(ls, lkb, ms))
3920 		return -ENOMEM;
3921 
3922 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3923 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3924 
3925 	return 0;
3926 }
3927 
3928 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3929 			       const struct dlm_message *ms)
3930 {
3931 	if (receive_lvb(ls, lkb, ms))
3932 		return -ENOMEM;
3933 	return 0;
3934 }
3935 
3936 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3937    uses to send a reply and that the remote end uses to process the reply. */
3938 
3939 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3940 {
3941 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3942 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3943 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3944 }
3945 
3946 /* This is called after the rsb is locked so that we can safely inspect
3947    fields in the lkb. */
3948 
3949 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3950 {
3951 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3952 	int error = 0;
3953 
3954 	/* currently mixing of user/kernel locks are not supported */
3955 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3956 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3957 		log_error(lkb->lkb_resource->res_ls,
3958 			  "got user dlm message for a kernel lock");
3959 		error = -EINVAL;
3960 		goto out;
3961 	}
3962 
3963 	switch (ms->m_type) {
3964 	case cpu_to_le32(DLM_MSG_CONVERT):
3965 	case cpu_to_le32(DLM_MSG_UNLOCK):
3966 	case cpu_to_le32(DLM_MSG_CANCEL):
3967 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3968 			error = -EINVAL;
3969 		break;
3970 
3971 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3972 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3973 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3974 	case cpu_to_le32(DLM_MSG_GRANT):
3975 	case cpu_to_le32(DLM_MSG_BAST):
3976 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3977 			error = -EINVAL;
3978 		break;
3979 
3980 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3981 		if (!is_process_copy(lkb))
3982 			error = -EINVAL;
3983 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3984 			error = -EINVAL;
3985 		break;
3986 
3987 	default:
3988 		error = -EINVAL;
3989 	}
3990 
3991 out:
3992 	if (error)
3993 		log_error(lkb->lkb_resource->res_ls,
3994 			  "ignore invalid message %d from %d %x %x %x %d",
3995 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3996 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3997 			  lkb->lkb_nodeid);
3998 	return error;
3999 }
4000 
4001 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
4002 {
4003 	struct dlm_lkb *lkb;
4004 	struct dlm_rsb *r;
4005 	int from_nodeid;
4006 	int error, namelen = 0;
4007 
4008 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4009 
4010 	error = create_lkb(ls, &lkb);
4011 	if (error)
4012 		goto fail;
4013 
4014 	receive_flags(lkb, ms);
4015 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4016 	error = receive_request_args(ls, lkb, ms);
4017 	if (error) {
4018 		__put_lkb(ls, lkb);
4019 		goto fail;
4020 	}
4021 
4022 	/* The dir node is the authority on whether we are the master
4023 	   for this rsb or not, so if the master sends us a request, we should
4024 	   recreate the rsb if we've destroyed it.   This race happens when we
4025 	   send a remove message to the dir node at the same time that the dir
4026 	   node sends us a request for the rsb. */
4027 
4028 	namelen = receive_extralen(ms);
4029 
4030 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4031 			 R_RECEIVE_REQUEST, &r);
4032 	if (error) {
4033 		__put_lkb(ls, lkb);
4034 		goto fail;
4035 	}
4036 
4037 	lock_rsb(r);
4038 
4039 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4040 		error = validate_master_nodeid(ls, r, from_nodeid);
4041 		if (error) {
4042 			unlock_rsb(r);
4043 			put_rsb(r);
4044 			__put_lkb(ls, lkb);
4045 			goto fail;
4046 		}
4047 	}
4048 
4049 	attach_lkb(r, lkb);
4050 	error = do_request(r, lkb);
4051 	send_request_reply(r, lkb, error);
4052 	do_request_effects(r, lkb, error);
4053 
4054 	unlock_rsb(r);
4055 	put_rsb(r);
4056 
4057 	if (error == -EINPROGRESS)
4058 		error = 0;
4059 	if (error)
4060 		dlm_put_lkb(lkb);
4061 	return 0;
4062 
4063  fail:
4064 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4065 	   and do this receive_request again from process_lookup_list once
4066 	   we get the lookup reply.  This would avoid a many repeated
4067 	   ENOTBLK request failures when the lookup reply designating us
4068 	   as master is delayed. */
4069 
4070 	if (error != -ENOTBLK) {
4071 		log_limit(ls, "receive_request %x from %d %d",
4072 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4073 	}
4074 
4075 	setup_local_lkb(ls, ms);
4076 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4077 	return error;
4078 }
4079 
4080 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4081 {
4082 	struct dlm_lkb *lkb;
4083 	struct dlm_rsb *r;
4084 	int error, reply = 1;
4085 
4086 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4087 	if (error)
4088 		goto fail;
4089 
4090 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4091 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4092 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4093 			  (unsigned long long)lkb->lkb_recover_seq,
4094 			  le32_to_cpu(ms->m_header.h_nodeid),
4095 			  le32_to_cpu(ms->m_lkid));
4096 		error = -ENOENT;
4097 		dlm_put_lkb(lkb);
4098 		goto fail;
4099 	}
4100 
4101 	r = lkb->lkb_resource;
4102 
4103 	hold_rsb(r);
4104 	lock_rsb(r);
4105 
4106 	error = validate_message(lkb, ms);
4107 	if (error)
4108 		goto out;
4109 
4110 	receive_flags(lkb, ms);
4111 
4112 	error = receive_convert_args(ls, lkb, ms);
4113 	if (error) {
4114 		send_convert_reply(r, lkb, error);
4115 		goto out;
4116 	}
4117 
4118 	reply = !down_conversion(lkb);
4119 
4120 	error = do_convert(r, lkb);
4121 	if (reply)
4122 		send_convert_reply(r, lkb, error);
4123 	do_convert_effects(r, lkb, error);
4124  out:
4125 	unlock_rsb(r);
4126 	put_rsb(r);
4127 	dlm_put_lkb(lkb);
4128 	return 0;
4129 
4130  fail:
4131 	setup_local_lkb(ls, ms);
4132 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4133 	return error;
4134 }
4135 
4136 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4137 {
4138 	struct dlm_lkb *lkb;
4139 	struct dlm_rsb *r;
4140 	int error;
4141 
4142 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4143 	if (error)
4144 		goto fail;
4145 
4146 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4147 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4148 			  lkb->lkb_id, lkb->lkb_remid,
4149 			  le32_to_cpu(ms->m_header.h_nodeid),
4150 			  le32_to_cpu(ms->m_lkid));
4151 		error = -ENOENT;
4152 		dlm_put_lkb(lkb);
4153 		goto fail;
4154 	}
4155 
4156 	r = lkb->lkb_resource;
4157 
4158 	hold_rsb(r);
4159 	lock_rsb(r);
4160 
4161 	error = validate_message(lkb, ms);
4162 	if (error)
4163 		goto out;
4164 
4165 	receive_flags(lkb, ms);
4166 
4167 	error = receive_unlock_args(ls, lkb, ms);
4168 	if (error) {
4169 		send_unlock_reply(r, lkb, error);
4170 		goto out;
4171 	}
4172 
4173 	error = do_unlock(r, lkb);
4174 	send_unlock_reply(r, lkb, error);
4175 	do_unlock_effects(r, lkb, error);
4176  out:
4177 	unlock_rsb(r);
4178 	put_rsb(r);
4179 	dlm_put_lkb(lkb);
4180 	return 0;
4181 
4182  fail:
4183 	setup_local_lkb(ls, ms);
4184 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4185 	return error;
4186 }
4187 
4188 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4189 {
4190 	struct dlm_lkb *lkb;
4191 	struct dlm_rsb *r;
4192 	int error;
4193 
4194 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4195 	if (error)
4196 		goto fail;
4197 
4198 	receive_flags(lkb, ms);
4199 
4200 	r = lkb->lkb_resource;
4201 
4202 	hold_rsb(r);
4203 	lock_rsb(r);
4204 
4205 	error = validate_message(lkb, ms);
4206 	if (error)
4207 		goto out;
4208 
4209 	error = do_cancel(r, lkb);
4210 	send_cancel_reply(r, lkb, error);
4211 	do_cancel_effects(r, lkb, error);
4212  out:
4213 	unlock_rsb(r);
4214 	put_rsb(r);
4215 	dlm_put_lkb(lkb);
4216 	return 0;
4217 
4218  fail:
4219 	setup_local_lkb(ls, ms);
4220 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4221 	return error;
4222 }
4223 
4224 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4225 {
4226 	struct dlm_lkb *lkb;
4227 	struct dlm_rsb *r;
4228 	int error;
4229 
4230 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4231 	if (error)
4232 		return error;
4233 
4234 	r = lkb->lkb_resource;
4235 
4236 	hold_rsb(r);
4237 	lock_rsb(r);
4238 
4239 	error = validate_message(lkb, ms);
4240 	if (error)
4241 		goto out;
4242 
4243 	receive_flags_reply(lkb, ms, false);
4244 	if (is_altmode(lkb))
4245 		munge_altmode(lkb, ms);
4246 	grant_lock_pc(r, lkb, ms);
4247 	queue_cast(r, lkb, 0);
4248  out:
4249 	unlock_rsb(r);
4250 	put_rsb(r);
4251 	dlm_put_lkb(lkb);
4252 	return 0;
4253 }
4254 
4255 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4256 {
4257 	struct dlm_lkb *lkb;
4258 	struct dlm_rsb *r;
4259 	int error;
4260 
4261 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4262 	if (error)
4263 		return error;
4264 
4265 	r = lkb->lkb_resource;
4266 
4267 	hold_rsb(r);
4268 	lock_rsb(r);
4269 
4270 	error = validate_message(lkb, ms);
4271 	if (error)
4272 		goto out;
4273 
4274 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4275 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4276  out:
4277 	unlock_rsb(r);
4278 	put_rsb(r);
4279 	dlm_put_lkb(lkb);
4280 	return 0;
4281 }
4282 
4283 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4284 {
4285 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4286 
4287 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4288 	our_nodeid = dlm_our_nodeid();
4289 
4290 	len = receive_extralen(ms);
4291 
4292 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4293 				  &ret_nodeid, NULL);
4294 
4295 	/* Optimization: we're master so treat lookup as a request */
4296 	if (!error && ret_nodeid == our_nodeid) {
4297 		receive_request(ls, ms);
4298 		return;
4299 	}
4300 	send_lookup_reply(ls, ms, ret_nodeid, error);
4301 }
4302 
4303 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4304 {
4305 	char name[DLM_RESNAME_MAXLEN+1];
4306 	struct dlm_rsb *r;
4307 	int rv, len, dir_nodeid, from_nodeid;
4308 
4309 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4310 
4311 	len = receive_extralen(ms);
4312 
4313 	if (len > DLM_RESNAME_MAXLEN) {
4314 		log_error(ls, "receive_remove from %d bad len %d",
4315 			  from_nodeid, len);
4316 		return;
4317 	}
4318 
4319 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4320 	if (dir_nodeid != dlm_our_nodeid()) {
4321 		log_error(ls, "receive_remove from %d bad nodeid %d",
4322 			  from_nodeid, dir_nodeid);
4323 		return;
4324 	}
4325 
4326 	/* Look for name in rsb toss state, if it's there, kill it.
4327 	 * If it's in non toss state, it's being used, and we should ignore this
4328 	 * message.  This is an expected race between the dir node sending a
4329 	 * request to the master node at the same time as the master node sends
4330 	 * a remove to the dir node.  The resolution to that race is for the
4331 	 * dir node to ignore the remove message, and the master node to
4332 	 * recreate the master rsb when it gets a request from the dir node for
4333 	 * an rsb it doesn't have.
4334 	 */
4335 
4336 	memset(name, 0, sizeof(name));
4337 	memcpy(name, ms->m_extra, len);
4338 
4339 	write_lock_bh(&ls->ls_rsbtbl_lock);
4340 
4341 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4342 	if (rv) {
4343 		/* should not happen */
4344 		log_error(ls, "%s from %d not found %s", __func__,
4345 			  from_nodeid, name);
4346 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4347 		return;
4348 	}
4349 
4350 	if (!rsb_flag(r, RSB_TOSS)) {
4351 		if (r->res_master_nodeid != from_nodeid) {
4352 			/* should not happen */
4353 			log_error(ls, "receive_remove keep from %d master %d",
4354 				  from_nodeid, r->res_master_nodeid);
4355 			dlm_print_rsb(r);
4356 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 			return;
4358 		}
4359 
4360 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4361 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4362 			  name);
4363 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4364 		return;
4365 	}
4366 
4367 	if (r->res_master_nodeid != from_nodeid) {
4368 		log_error(ls, "receive_remove toss from %d master %d",
4369 			  from_nodeid, r->res_master_nodeid);
4370 		dlm_print_rsb(r);
4371 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4372 		return;
4373 	}
4374 
4375 	list_del(&r->res_rsbs_list);
4376 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4377 			       dlm_rhash_rsb_params);
4378 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4379 
4380 	free_toss_rsb(r);
4381 }
4382 
4383 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4384 {
4385 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4386 }
4387 
4388 static int receive_request_reply(struct dlm_ls *ls,
4389 				 const struct dlm_message *ms)
4390 {
4391 	struct dlm_lkb *lkb;
4392 	struct dlm_rsb *r;
4393 	int error, mstype, result;
4394 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4395 
4396 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4397 	if (error)
4398 		return error;
4399 
4400 	r = lkb->lkb_resource;
4401 	hold_rsb(r);
4402 	lock_rsb(r);
4403 
4404 	error = validate_message(lkb, ms);
4405 	if (error)
4406 		goto out;
4407 
4408 	mstype = lkb->lkb_wait_type;
4409 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4410 	if (error) {
4411 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4412 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4413 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4414 		dlm_dump_rsb(r);
4415 		goto out;
4416 	}
4417 
4418 	/* Optimization: the dir node was also the master, so it took our
4419 	   lookup as a request and sent request reply instead of lookup reply */
4420 	if (mstype == DLM_MSG_LOOKUP) {
4421 		r->res_master_nodeid = from_nodeid;
4422 		r->res_nodeid = from_nodeid;
4423 		lkb->lkb_nodeid = from_nodeid;
4424 	}
4425 
4426 	/* this is the value returned from do_request() on the master */
4427 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4428 
4429 	switch (result) {
4430 	case -EAGAIN:
4431 		/* request would block (be queued) on remote master */
4432 		queue_cast(r, lkb, -EAGAIN);
4433 		confirm_master(r, -EAGAIN);
4434 		unhold_lkb(lkb); /* undoes create_lkb() */
4435 		break;
4436 
4437 	case -EINPROGRESS:
4438 	case 0:
4439 		/* request was queued or granted on remote master */
4440 		receive_flags_reply(lkb, ms, false);
4441 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4442 		if (is_altmode(lkb))
4443 			munge_altmode(lkb, ms);
4444 		if (result) {
4445 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4446 		} else {
4447 			grant_lock_pc(r, lkb, ms);
4448 			queue_cast(r, lkb, 0);
4449 		}
4450 		confirm_master(r, result);
4451 		break;
4452 
4453 	case -EBADR:
4454 	case -ENOTBLK:
4455 		/* find_rsb failed to find rsb or rsb wasn't master */
4456 		log_limit(ls, "receive_request_reply %x from %d %d "
4457 			  "master %d dir %d first %x %s", lkb->lkb_id,
4458 			  from_nodeid, result, r->res_master_nodeid,
4459 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4460 
4461 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4462 		    r->res_master_nodeid != dlm_our_nodeid()) {
4463 			/* cause _request_lock->set_master->send_lookup */
4464 			r->res_master_nodeid = 0;
4465 			r->res_nodeid = -1;
4466 			lkb->lkb_nodeid = -1;
4467 		}
4468 
4469 		if (is_overlap(lkb)) {
4470 			/* we'll ignore error in cancel/unlock reply */
4471 			queue_cast_overlap(r, lkb);
4472 			confirm_master(r, result);
4473 			unhold_lkb(lkb); /* undoes create_lkb() */
4474 		} else {
4475 			_request_lock(r, lkb);
4476 
4477 			if (r->res_master_nodeid == dlm_our_nodeid())
4478 				confirm_master(r, 0);
4479 		}
4480 		break;
4481 
4482 	default:
4483 		log_error(ls, "receive_request_reply %x error %d",
4484 			  lkb->lkb_id, result);
4485 	}
4486 
4487 	if ((result == 0 || result == -EINPROGRESS) &&
4488 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4489 		log_debug(ls, "receive_request_reply %x result %d unlock",
4490 			  lkb->lkb_id, result);
4491 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4492 		send_unlock(r, lkb);
4493 	} else if ((result == -EINPROGRESS) &&
4494 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4495 				      &lkb->lkb_iflags)) {
4496 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4497 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4498 		send_cancel(r, lkb);
4499 	} else {
4500 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4501 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4502 	}
4503  out:
4504 	unlock_rsb(r);
4505 	put_rsb(r);
4506 	dlm_put_lkb(lkb);
4507 	return 0;
4508 }
4509 
4510 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4511 				    const struct dlm_message *ms, bool local)
4512 {
4513 	/* this is the value returned from do_convert() on the master */
4514 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4515 	case -EAGAIN:
4516 		/* convert would block (be queued) on remote master */
4517 		queue_cast(r, lkb, -EAGAIN);
4518 		break;
4519 
4520 	case -EDEADLK:
4521 		receive_flags_reply(lkb, ms, local);
4522 		revert_lock_pc(r, lkb);
4523 		queue_cast(r, lkb, -EDEADLK);
4524 		break;
4525 
4526 	case -EINPROGRESS:
4527 		/* convert was queued on remote master */
4528 		receive_flags_reply(lkb, ms, local);
4529 		if (is_demoted(lkb))
4530 			munge_demoted(lkb);
4531 		del_lkb(r, lkb);
4532 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4533 		break;
4534 
4535 	case 0:
4536 		/* convert was granted on remote master */
4537 		receive_flags_reply(lkb, ms, local);
4538 		if (is_demoted(lkb))
4539 			munge_demoted(lkb);
4540 		grant_lock_pc(r, lkb, ms);
4541 		queue_cast(r, lkb, 0);
4542 		break;
4543 
4544 	default:
4545 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4546 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4547 			  le32_to_cpu(ms->m_lkid),
4548 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4549 		dlm_print_rsb(r);
4550 		dlm_print_lkb(lkb);
4551 	}
4552 }
4553 
4554 static void _receive_convert_reply(struct dlm_lkb *lkb,
4555 				   const struct dlm_message *ms, bool local)
4556 {
4557 	struct dlm_rsb *r = lkb->lkb_resource;
4558 	int error;
4559 
4560 	hold_rsb(r);
4561 	lock_rsb(r);
4562 
4563 	error = validate_message(lkb, ms);
4564 	if (error)
4565 		goto out;
4566 
4567 	error = remove_from_waiters_ms(lkb, ms, local);
4568 	if (error)
4569 		goto out;
4570 
4571 	__receive_convert_reply(r, lkb, ms, local);
4572  out:
4573 	unlock_rsb(r);
4574 	put_rsb(r);
4575 }
4576 
4577 static int receive_convert_reply(struct dlm_ls *ls,
4578 				 const struct dlm_message *ms)
4579 {
4580 	struct dlm_lkb *lkb;
4581 	int error;
4582 
4583 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4584 	if (error)
4585 		return error;
4586 
4587 	_receive_convert_reply(lkb, ms, false);
4588 	dlm_put_lkb(lkb);
4589 	return 0;
4590 }
4591 
4592 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4593 				  const struct dlm_message *ms, bool local)
4594 {
4595 	struct dlm_rsb *r = lkb->lkb_resource;
4596 	int error;
4597 
4598 	hold_rsb(r);
4599 	lock_rsb(r);
4600 
4601 	error = validate_message(lkb, ms);
4602 	if (error)
4603 		goto out;
4604 
4605 	error = remove_from_waiters_ms(lkb, ms, local);
4606 	if (error)
4607 		goto out;
4608 
4609 	/* this is the value returned from do_unlock() on the master */
4610 
4611 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4612 	case -DLM_EUNLOCK:
4613 		receive_flags_reply(lkb, ms, local);
4614 		remove_lock_pc(r, lkb);
4615 		queue_cast(r, lkb, -DLM_EUNLOCK);
4616 		break;
4617 	case -ENOENT:
4618 		break;
4619 	default:
4620 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4621 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4622 	}
4623  out:
4624 	unlock_rsb(r);
4625 	put_rsb(r);
4626 }
4627 
4628 static int receive_unlock_reply(struct dlm_ls *ls,
4629 				const struct dlm_message *ms)
4630 {
4631 	struct dlm_lkb *lkb;
4632 	int error;
4633 
4634 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4635 	if (error)
4636 		return error;
4637 
4638 	_receive_unlock_reply(lkb, ms, false);
4639 	dlm_put_lkb(lkb);
4640 	return 0;
4641 }
4642 
4643 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4644 				  const struct dlm_message *ms, bool local)
4645 {
4646 	struct dlm_rsb *r = lkb->lkb_resource;
4647 	int error;
4648 
4649 	hold_rsb(r);
4650 	lock_rsb(r);
4651 
4652 	error = validate_message(lkb, ms);
4653 	if (error)
4654 		goto out;
4655 
4656 	error = remove_from_waiters_ms(lkb, ms, local);
4657 	if (error)
4658 		goto out;
4659 
4660 	/* this is the value returned from do_cancel() on the master */
4661 
4662 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4663 	case -DLM_ECANCEL:
4664 		receive_flags_reply(lkb, ms, local);
4665 		revert_lock_pc(r, lkb);
4666 		queue_cast(r, lkb, -DLM_ECANCEL);
4667 		break;
4668 	case 0:
4669 		break;
4670 	default:
4671 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4672 			  lkb->lkb_id,
4673 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4674 	}
4675  out:
4676 	unlock_rsb(r);
4677 	put_rsb(r);
4678 }
4679 
4680 static int receive_cancel_reply(struct dlm_ls *ls,
4681 				const struct dlm_message *ms)
4682 {
4683 	struct dlm_lkb *lkb;
4684 	int error;
4685 
4686 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4687 	if (error)
4688 		return error;
4689 
4690 	_receive_cancel_reply(lkb, ms, false);
4691 	dlm_put_lkb(lkb);
4692 	return 0;
4693 }
4694 
4695 static void receive_lookup_reply(struct dlm_ls *ls,
4696 				 const struct dlm_message *ms)
4697 {
4698 	struct dlm_lkb *lkb;
4699 	struct dlm_rsb *r;
4700 	int error, ret_nodeid;
4701 	int do_lookup_list = 0;
4702 
4703 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4704 	if (error) {
4705 		log_error(ls, "%s no lkid %x", __func__,
4706 			  le32_to_cpu(ms->m_lkid));
4707 		return;
4708 	}
4709 
4710 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4711 	   FIXME: will a non-zero error ever be returned? */
4712 
4713 	r = lkb->lkb_resource;
4714 	hold_rsb(r);
4715 	lock_rsb(r);
4716 
4717 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4718 	if (error)
4719 		goto out;
4720 
4721 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4722 
4723 	/* We sometimes receive a request from the dir node for this
4724 	   rsb before we've received the dir node's loookup_reply for it.
4725 	   The request from the dir node implies we're the master, so we set
4726 	   ourself as master in receive_request_reply, and verify here that
4727 	   we are indeed the master. */
4728 
4729 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4730 		/* This should never happen */
4731 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4732 			  "master %d dir %d our %d first %x %s",
4733 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4734 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4735 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4736 	}
4737 
4738 	if (ret_nodeid == dlm_our_nodeid()) {
4739 		r->res_master_nodeid = ret_nodeid;
4740 		r->res_nodeid = 0;
4741 		do_lookup_list = 1;
4742 		r->res_first_lkid = 0;
4743 	} else if (ret_nodeid == -1) {
4744 		/* the remote node doesn't believe it's the dir node */
4745 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4746 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4747 		r->res_master_nodeid = 0;
4748 		r->res_nodeid = -1;
4749 		lkb->lkb_nodeid = -1;
4750 	} else {
4751 		/* set_master() will set lkb_nodeid from r */
4752 		r->res_master_nodeid = ret_nodeid;
4753 		r->res_nodeid = ret_nodeid;
4754 	}
4755 
4756 	if (is_overlap(lkb)) {
4757 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4758 			  lkb->lkb_id, dlm_iflags_val(lkb));
4759 		queue_cast_overlap(r, lkb);
4760 		unhold_lkb(lkb); /* undoes create_lkb() */
4761 		goto out_list;
4762 	}
4763 
4764 	_request_lock(r, lkb);
4765 
4766  out_list:
4767 	if (do_lookup_list)
4768 		process_lookup_list(r);
4769  out:
4770 	unlock_rsb(r);
4771 	put_rsb(r);
4772 	dlm_put_lkb(lkb);
4773 }
4774 
4775 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4776 			     uint32_t saved_seq)
4777 {
4778 	int error = 0, noent = 0;
4779 
4780 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4781 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4782 			  le32_to_cpu(ms->m_type),
4783 			  le32_to_cpu(ms->m_header.h_nodeid),
4784 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4785 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4786 		return;
4787 	}
4788 
4789 	switch (ms->m_type) {
4790 
4791 	/* messages sent to a master node */
4792 
4793 	case cpu_to_le32(DLM_MSG_REQUEST):
4794 		error = receive_request(ls, ms);
4795 		break;
4796 
4797 	case cpu_to_le32(DLM_MSG_CONVERT):
4798 		error = receive_convert(ls, ms);
4799 		break;
4800 
4801 	case cpu_to_le32(DLM_MSG_UNLOCK):
4802 		error = receive_unlock(ls, ms);
4803 		break;
4804 
4805 	case cpu_to_le32(DLM_MSG_CANCEL):
4806 		noent = 1;
4807 		error = receive_cancel(ls, ms);
4808 		break;
4809 
4810 	/* messages sent from a master node (replies to above) */
4811 
4812 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4813 		error = receive_request_reply(ls, ms);
4814 		break;
4815 
4816 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4817 		error = receive_convert_reply(ls, ms);
4818 		break;
4819 
4820 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4821 		error = receive_unlock_reply(ls, ms);
4822 		break;
4823 
4824 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4825 		error = receive_cancel_reply(ls, ms);
4826 		break;
4827 
4828 	/* messages sent from a master node (only two types of async msg) */
4829 
4830 	case cpu_to_le32(DLM_MSG_GRANT):
4831 		noent = 1;
4832 		error = receive_grant(ls, ms);
4833 		break;
4834 
4835 	case cpu_to_le32(DLM_MSG_BAST):
4836 		noent = 1;
4837 		error = receive_bast(ls, ms);
4838 		break;
4839 
4840 	/* messages sent to a dir node */
4841 
4842 	case cpu_to_le32(DLM_MSG_LOOKUP):
4843 		receive_lookup(ls, ms);
4844 		break;
4845 
4846 	case cpu_to_le32(DLM_MSG_REMOVE):
4847 		receive_remove(ls, ms);
4848 		break;
4849 
4850 	/* messages sent from a dir node (remove has no reply) */
4851 
4852 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4853 		receive_lookup_reply(ls, ms);
4854 		break;
4855 
4856 	/* other messages */
4857 
4858 	case cpu_to_le32(DLM_MSG_PURGE):
4859 		receive_purge(ls, ms);
4860 		break;
4861 
4862 	default:
4863 		log_error(ls, "unknown message type %d",
4864 			  le32_to_cpu(ms->m_type));
4865 	}
4866 
4867 	/*
4868 	 * When checking for ENOENT, we're checking the result of
4869 	 * find_lkb(m_remid):
4870 	 *
4871 	 * The lock id referenced in the message wasn't found.  This may
4872 	 * happen in normal usage for the async messages and cancel, so
4873 	 * only use log_debug for them.
4874 	 *
4875 	 * Some errors are expected and normal.
4876 	 */
4877 
4878 	if (error == -ENOENT && noent) {
4879 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4880 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4881 			  le32_to_cpu(ms->m_header.h_nodeid),
4882 			  le32_to_cpu(ms->m_lkid), saved_seq);
4883 	} else if (error == -ENOENT) {
4884 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4885 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4886 			  le32_to_cpu(ms->m_header.h_nodeid),
4887 			  le32_to_cpu(ms->m_lkid), saved_seq);
4888 
4889 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4890 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4891 	}
4892 
4893 	if (error == -EINVAL) {
4894 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4895 			  "saved_seq %u",
4896 			  le32_to_cpu(ms->m_type),
4897 			  le32_to_cpu(ms->m_header.h_nodeid),
4898 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4899 			  saved_seq);
4900 	}
4901 }
4902 
4903 /* If the lockspace is in recovery mode (locking stopped), then normal
4904    messages are saved on the requestqueue for processing after recovery is
4905    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4906    messages off the requestqueue before we process new ones. This occurs right
4907    after recovery completes when we transition from saving all messages on
4908    requestqueue, to processing all the saved messages, to processing new
4909    messages as they arrive. */
4910 
4911 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4912 				int nodeid)
4913 {
4914 try_again:
4915 	read_lock_bh(&ls->ls_requestqueue_lock);
4916 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917 		/* If we were a member of this lockspace, left, and rejoined,
4918 		   other nodes may still be sending us messages from the
4919 		   lockspace generation before we left. */
4920 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4921 			read_unlock_bh(&ls->ls_requestqueue_lock);
4922 			log_limit(ls, "receive %d from %d ignore old gen",
4923 				  le32_to_cpu(ms->m_type), nodeid);
4924 			return;
4925 		}
4926 
4927 		read_unlock_bh(&ls->ls_requestqueue_lock);
4928 		write_lock_bh(&ls->ls_requestqueue_lock);
4929 		/* recheck because we hold writelock now */
4930 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4931 			write_unlock_bh(&ls->ls_requestqueue_lock);
4932 			goto try_again;
4933 		}
4934 
4935 		dlm_add_requestqueue(ls, nodeid, ms);
4936 		write_unlock_bh(&ls->ls_requestqueue_lock);
4937 	} else {
4938 		_receive_message(ls, ms, 0);
4939 		read_unlock_bh(&ls->ls_requestqueue_lock);
4940 	}
4941 }
4942 
4943 /* This is called by dlm_recoverd to process messages that were saved on
4944    the requestqueue. */
4945 
4946 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4947 			       uint32_t saved_seq)
4948 {
4949 	_receive_message(ls, ms, saved_seq);
4950 }
4951 
4952 /* This is called by the midcomms layer when something is received for
4953    the lockspace.  It could be either a MSG (normal message sent as part of
4954    standard locking activity) or an RCOM (recovery message sent as part of
4955    lockspace recovery). */
4956 
4957 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4958 {
4959 	const struct dlm_header *hd = &p->header;
4960 	struct dlm_ls *ls;
4961 	int type = 0;
4962 
4963 	switch (hd->h_cmd) {
4964 	case DLM_MSG:
4965 		type = le32_to_cpu(p->message.m_type);
4966 		break;
4967 	case DLM_RCOM:
4968 		type = le32_to_cpu(p->rcom.rc_type);
4969 		break;
4970 	default:
4971 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4972 		return;
4973 	}
4974 
4975 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4976 		log_print("invalid h_nodeid %d from %d lockspace %x",
4977 			  le32_to_cpu(hd->h_nodeid), nodeid,
4978 			  le32_to_cpu(hd->u.h_lockspace));
4979 		return;
4980 	}
4981 
4982 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4983 	if (!ls) {
4984 		if (dlm_config.ci_log_debug) {
4985 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4986 				"%u from %d cmd %d type %d\n",
4987 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4988 				hd->h_cmd, type);
4989 		}
4990 
4991 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4992 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4993 		return;
4994 	}
4995 
4996 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4997 	   be inactive (in this ls) before transitioning to recovery mode */
4998 
4999 	read_lock_bh(&ls->ls_recv_active);
5000 	if (hd->h_cmd == DLM_MSG)
5001 		dlm_receive_message(ls, &p->message, nodeid);
5002 	else if (hd->h_cmd == DLM_RCOM)
5003 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5004 	else
5005 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5006 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5007 	read_unlock_bh(&ls->ls_recv_active);
5008 
5009 	dlm_put_lockspace(ls);
5010 }
5011 
5012 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5013 				   struct dlm_message *ms_local)
5014 {
5015 	if (middle_conversion(lkb)) {
5016 		hold_lkb(lkb);
5017 		memset(ms_local, 0, sizeof(struct dlm_message));
5018 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5019 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5020 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5021 		_receive_convert_reply(lkb, ms_local, true);
5022 
5023 		/* Same special case as in receive_rcom_lock_args() */
5024 		lkb->lkb_grmode = DLM_LOCK_IV;
5025 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
5026 		unhold_lkb(lkb);
5027 
5028 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5029 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5030 	}
5031 
5032 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5033 	   conversions are async; there's no reply from the remote master */
5034 }
5035 
5036 /* A waiting lkb needs recovery if the master node has failed, or
5037    the master node is changing (only when no directory is used) */
5038 
5039 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5040 				 int dir_nodeid)
5041 {
5042 	if (dlm_no_directory(ls))
5043 		return 1;
5044 
5045 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5046 		return 1;
5047 
5048 	return 0;
5049 }
5050 
5051 /* Recovery for locks that are waiting for replies from nodes that are now
5052    gone.  We can just complete unlocks and cancels by faking a reply from the
5053    dead node.  Requests and up-conversions we flag to be resent after
5054    recovery.  Down-conversions can just be completed with a fake reply like
5055    unlocks.  Conversions between PR and CW need special attention. */
5056 
5057 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5058 {
5059 	struct dlm_lkb *lkb, *safe;
5060 	struct dlm_message *ms_local;
5061 	int wait_type, local_unlock_result, local_cancel_result;
5062 	int dir_nodeid;
5063 
5064 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5065 	if (!ms_local)
5066 		return;
5067 
5068 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5069 
5070 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5071 
5072 		/* exclude debug messages about unlocks because there can be so
5073 		   many and they aren't very interesting */
5074 
5075 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5076 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5077 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5078 				  lkb->lkb_id,
5079 				  lkb->lkb_remid,
5080 				  lkb->lkb_wait_type,
5081 				  lkb->lkb_resource->res_nodeid,
5082 				  lkb->lkb_nodeid,
5083 				  lkb->lkb_wait_nodeid,
5084 				  dir_nodeid);
5085 		}
5086 
5087 		/* all outstanding lookups, regardless of destination  will be
5088 		   resent after recovery is done */
5089 
5090 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5091 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5092 			continue;
5093 		}
5094 
5095 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5096 			continue;
5097 
5098 		wait_type = lkb->lkb_wait_type;
5099 		local_unlock_result = -DLM_EUNLOCK;
5100 		local_cancel_result = -DLM_ECANCEL;
5101 
5102 		/* Main reply may have been received leaving a zero wait_type,
5103 		   but a reply for the overlapping op may not have been
5104 		   received.  In that case we need to fake the appropriate
5105 		   reply for the overlap op. */
5106 
5107 		if (!wait_type) {
5108 			if (is_overlap_cancel(lkb)) {
5109 				wait_type = DLM_MSG_CANCEL;
5110 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5111 					local_cancel_result = 0;
5112 			}
5113 			if (is_overlap_unlock(lkb)) {
5114 				wait_type = DLM_MSG_UNLOCK;
5115 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5116 					local_unlock_result = -ENOENT;
5117 			}
5118 
5119 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5120 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5121 				  local_cancel_result, local_unlock_result);
5122 		}
5123 
5124 		switch (wait_type) {
5125 
5126 		case DLM_MSG_REQUEST:
5127 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5128 			break;
5129 
5130 		case DLM_MSG_CONVERT:
5131 			recover_convert_waiter(ls, lkb, ms_local);
5132 			break;
5133 
5134 		case DLM_MSG_UNLOCK:
5135 			hold_lkb(lkb);
5136 			memset(ms_local, 0, sizeof(struct dlm_message));
5137 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5138 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5139 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5140 			_receive_unlock_reply(lkb, ms_local, true);
5141 			dlm_put_lkb(lkb);
5142 			break;
5143 
5144 		case DLM_MSG_CANCEL:
5145 			hold_lkb(lkb);
5146 			memset(ms_local, 0, sizeof(struct dlm_message));
5147 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5148 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5149 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5150 			_receive_cancel_reply(lkb, ms_local, true);
5151 			dlm_put_lkb(lkb);
5152 			break;
5153 
5154 		default:
5155 			log_error(ls, "invalid lkb wait_type %d %d",
5156 				  lkb->lkb_wait_type, wait_type);
5157 		}
5158 		schedule();
5159 	}
5160 	kfree(ms_local);
5161 }
5162 
5163 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5164 {
5165 	struct dlm_lkb *lkb = NULL, *iter;
5166 
5167 	spin_lock_bh(&ls->ls_waiters_lock);
5168 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5169 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5170 			hold_lkb(iter);
5171 			lkb = iter;
5172 			break;
5173 		}
5174 	}
5175 	spin_unlock_bh(&ls->ls_waiters_lock);
5176 
5177 	return lkb;
5178 }
5179 
5180 /*
5181  * Forced state reset for locks that were in the middle of remote operations
5182  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5183  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5184  * list need to be reevaluated; some may need resending to a different node
5185  * than previously, and some may now need local handling rather than remote.
5186  *
5187  * First, the lkb state for the voided remote operation is forcibly reset,
5188  * equivalent to what remove_from_waiters() would normally do:
5189  * . lkb removed from ls_waiters list
5190  * . lkb wait_type cleared
5191  * . lkb waiters_count cleared
5192  * . lkb ref count decremented for each waiters_count (almost always 1,
5193  *   but possibly 2 in case of cancel/unlock overlapping, which means
5194  *   two remote replies were being expected for the lkb.)
5195  *
5196  * Second, the lkb is reprocessed like an original operation would be,
5197  * by passing it to _request_lock or _convert_lock, which will either
5198  * process the lkb operation locally, or send it to a remote node again
5199  * and put the lkb back onto the waiters list.
5200  *
5201  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5202  * force-unlock or cancel, either from before recovery began, or after recovery
5203  * finished.  If this is the case, the unlock/cancel is done directly, and the
5204  * original operation is not initiated again (no _request_lock/_convert_lock.)
5205  */
5206 
5207 int dlm_recover_waiters_post(struct dlm_ls *ls)
5208 {
5209 	struct dlm_lkb *lkb;
5210 	struct dlm_rsb *r;
5211 	int error = 0, mstype, err, oc, ou;
5212 
5213 	while (1) {
5214 		if (dlm_locking_stopped(ls)) {
5215 			log_debug(ls, "recover_waiters_post aborted");
5216 			error = -EINTR;
5217 			break;
5218 		}
5219 
5220 		/*
5221 		 * Find an lkb from the waiters list that's been affected by
5222 		 * recovery node changes, and needs to be reprocessed.  Does
5223 		 * hold_lkb(), adding a refcount.
5224 		 */
5225 		lkb = find_resend_waiter(ls);
5226 		if (!lkb)
5227 			break;
5228 
5229 		r = lkb->lkb_resource;
5230 		hold_rsb(r);
5231 		lock_rsb(r);
5232 
5233 		/*
5234 		 * If the lkb has been flagged for a force unlock or cancel,
5235 		 * then the reprocessing below will be replaced by just doing
5236 		 * the unlock/cancel directly.
5237 		 */
5238 		mstype = lkb->lkb_wait_type;
5239 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5240 					&lkb->lkb_iflags);
5241 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5242 					&lkb->lkb_iflags);
5243 		err = 0;
5244 
5245 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5246 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5247 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5248 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5249 			  dlm_dir_nodeid(r), oc, ou);
5250 
5251 		/*
5252 		 * No reply to the pre-recovery operation will now be received,
5253 		 * so a forced equivalent of remove_from_waiters() is needed to
5254 		 * reset the waiters state that was in place before recovery.
5255 		 */
5256 
5257 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5258 
5259 		/* Forcibly clear wait_type */
5260 		lkb->lkb_wait_type = 0;
5261 
5262 		/*
5263 		 * Forcibly reset wait_count and associated refcount.  The
5264 		 * wait_count will almost always be 1, but in case of an
5265 		 * overlapping unlock/cancel it could be 2: see where
5266 		 * add_to_waiters() finds the lkb is already on the waiters
5267 		 * list and does lkb_wait_count++; hold_lkb().
5268 		 */
5269 		while (lkb->lkb_wait_count) {
5270 			lkb->lkb_wait_count--;
5271 			unhold_lkb(lkb);
5272 		}
5273 
5274 		/* Forcibly remove from waiters list */
5275 		spin_lock_bh(&ls->ls_waiters_lock);
5276 		list_del_init(&lkb->lkb_wait_reply);
5277 		spin_unlock_bh(&ls->ls_waiters_lock);
5278 
5279 		/*
5280 		 * The lkb is now clear of all prior waiters state and can be
5281 		 * processed locally, or sent to remote node again, or directly
5282 		 * cancelled/unlocked.
5283 		 */
5284 
5285 		if (oc || ou) {
5286 			/* do an unlock or cancel instead of resending */
5287 			switch (mstype) {
5288 			case DLM_MSG_LOOKUP:
5289 			case DLM_MSG_REQUEST:
5290 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5291 							-DLM_ECANCEL);
5292 				unhold_lkb(lkb); /* undoes create_lkb() */
5293 				break;
5294 			case DLM_MSG_CONVERT:
5295 				if (oc) {
5296 					queue_cast(r, lkb, -DLM_ECANCEL);
5297 				} else {
5298 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5299 					_unlock_lock(r, lkb);
5300 				}
5301 				break;
5302 			default:
5303 				err = 1;
5304 			}
5305 		} else {
5306 			switch (mstype) {
5307 			case DLM_MSG_LOOKUP:
5308 			case DLM_MSG_REQUEST:
5309 				_request_lock(r, lkb);
5310 				if (is_master(r))
5311 					confirm_master(r, 0);
5312 				break;
5313 			case DLM_MSG_CONVERT:
5314 				_convert_lock(r, lkb);
5315 				break;
5316 			default:
5317 				err = 1;
5318 			}
5319 		}
5320 
5321 		if (err) {
5322 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5323 				  "dir_nodeid %d overlap %d %d",
5324 				  lkb->lkb_id, mstype, r->res_nodeid,
5325 				  dlm_dir_nodeid(r), oc, ou);
5326 		}
5327 		unlock_rsb(r);
5328 		put_rsb(r);
5329 		dlm_put_lkb(lkb);
5330 	}
5331 
5332 	return error;
5333 }
5334 
5335 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5336 			      struct list_head *list)
5337 {
5338 	struct dlm_lkb *lkb, *safe;
5339 
5340 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5341 		if (!is_master_copy(lkb))
5342 			continue;
5343 
5344 		/* don't purge lkbs we've added in recover_master_copy for
5345 		   the current recovery seq */
5346 
5347 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5348 			continue;
5349 
5350 		del_lkb(r, lkb);
5351 
5352 		/* this put should free the lkb */
5353 		if (!dlm_put_lkb(lkb))
5354 			log_error(ls, "purged mstcpy lkb not released");
5355 	}
5356 }
5357 
5358 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5359 {
5360 	struct dlm_ls *ls = r->res_ls;
5361 
5362 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5363 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5364 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5365 }
5366 
5367 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5368 			    struct list_head *list,
5369 			    int nodeid_gone, unsigned int *count)
5370 {
5371 	struct dlm_lkb *lkb, *safe;
5372 
5373 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5374 		if (!is_master_copy(lkb))
5375 			continue;
5376 
5377 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5378 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5379 
5380 			/* tell recover_lvb to invalidate the lvb
5381 			   because a node holding EX/PW failed */
5382 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5383 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5384 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5385 			}
5386 
5387 			del_lkb(r, lkb);
5388 
5389 			/* this put should free the lkb */
5390 			if (!dlm_put_lkb(lkb))
5391 				log_error(ls, "purged dead lkb not released");
5392 
5393 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5394 
5395 			(*count)++;
5396 		}
5397 	}
5398 }
5399 
5400 /* Get rid of locks held by nodes that are gone. */
5401 
5402 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5403 {
5404 	struct dlm_rsb *r;
5405 	struct dlm_member *memb;
5406 	int nodes_count = 0;
5407 	int nodeid_gone = 0;
5408 	unsigned int lkb_count = 0;
5409 
5410 	/* cache one removed nodeid to optimize the common
5411 	   case of a single node removed */
5412 
5413 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5414 		nodes_count++;
5415 		nodeid_gone = memb->nodeid;
5416 	}
5417 
5418 	if (!nodes_count)
5419 		return;
5420 
5421 	list_for_each_entry(r, root_list, res_root_list) {
5422 		hold_rsb(r);
5423 		lock_rsb(r);
5424 		if (is_master(r)) {
5425 			purge_dead_list(ls, r, &r->res_grantqueue,
5426 					nodeid_gone, &lkb_count);
5427 			purge_dead_list(ls, r, &r->res_convertqueue,
5428 					nodeid_gone, &lkb_count);
5429 			purge_dead_list(ls, r, &r->res_waitqueue,
5430 					nodeid_gone, &lkb_count);
5431 		}
5432 		unlock_rsb(r);
5433 		unhold_rsb(r);
5434 		cond_resched();
5435 	}
5436 
5437 	if (lkb_count)
5438 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5439 			  lkb_count, nodes_count);
5440 }
5441 
5442 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5443 {
5444 	struct dlm_rsb *r;
5445 
5446 	read_lock_bh(&ls->ls_rsbtbl_lock);
5447 	list_for_each_entry(r, &ls->ls_keep, res_rsbs_list) {
5448 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5449 			continue;
5450 		if (!is_master(r)) {
5451 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5452 			continue;
5453 		}
5454 		hold_rsb(r);
5455 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5456 		return r;
5457 	}
5458 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5459 	return NULL;
5460 }
5461 
5462 /*
5463  * Attempt to grant locks on resources that we are the master of.
5464  * Locks may have become grantable during recovery because locks
5465  * from departed nodes have been purged (or not rebuilt), allowing
5466  * previously blocked locks to now be granted.  The subset of rsb's
5467  * we are interested in are those with lkb's on either the convert or
5468  * waiting queues.
5469  *
5470  * Simplest would be to go through each master rsb and check for non-empty
5471  * convert or waiting queues, and attempt to grant on those rsbs.
5472  * Checking the queues requires lock_rsb, though, for which we'd need
5473  * to release the rsbtbl lock.  This would make iterating through all
5474  * rsb's very inefficient.  So, we rely on earlier recovery routines
5475  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5476  * locks for.
5477  */
5478 
5479 void dlm_recover_grant(struct dlm_ls *ls)
5480 {
5481 	struct dlm_rsb *r;
5482 	unsigned int count = 0;
5483 	unsigned int rsb_count = 0;
5484 	unsigned int lkb_count = 0;
5485 
5486 	while (1) {
5487 		r = find_grant_rsb(ls);
5488 		if (!r)
5489 			break;
5490 
5491 		rsb_count++;
5492 		count = 0;
5493 		lock_rsb(r);
5494 		/* the RECOVER_GRANT flag is checked in the grant path */
5495 		grant_pending_locks(r, &count);
5496 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5497 		lkb_count += count;
5498 		confirm_master(r, 0);
5499 		unlock_rsb(r);
5500 		put_rsb(r);
5501 		cond_resched();
5502 	}
5503 
5504 	if (lkb_count)
5505 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5506 			  lkb_count, rsb_count);
5507 }
5508 
5509 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5510 					 uint32_t remid)
5511 {
5512 	struct dlm_lkb *lkb;
5513 
5514 	list_for_each_entry(lkb, head, lkb_statequeue) {
5515 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5516 			return lkb;
5517 	}
5518 	return NULL;
5519 }
5520 
5521 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5522 				    uint32_t remid)
5523 {
5524 	struct dlm_lkb *lkb;
5525 
5526 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5527 	if (lkb)
5528 		return lkb;
5529 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5530 	if (lkb)
5531 		return lkb;
5532 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5533 	if (lkb)
5534 		return lkb;
5535 	return NULL;
5536 }
5537 
5538 /* needs at least dlm_rcom + rcom_lock */
5539 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5540 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5541 {
5542 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5543 
5544 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5545 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5546 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5547 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5548 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5549 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5550 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5551 	lkb->lkb_rqmode = rl->rl_rqmode;
5552 	lkb->lkb_grmode = rl->rl_grmode;
5553 	/* don't set lkb_status because add_lkb wants to itself */
5554 
5555 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5556 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5557 
5558 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5559 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5560 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5561 		if (lvblen > ls->ls_lvblen)
5562 			return -EINVAL;
5563 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5564 		if (!lkb->lkb_lvbptr)
5565 			return -ENOMEM;
5566 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5567 	}
5568 
5569 	/* Conversions between PR and CW (middle modes) need special handling.
5570 	   The real granted mode of these converting locks cannot be determined
5571 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5572 
5573 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5574 	    middle_conversion(lkb)) {
5575 		rl->rl_status = DLM_LKSTS_CONVERT;
5576 		lkb->lkb_grmode = DLM_LOCK_IV;
5577 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5578 	}
5579 
5580 	return 0;
5581 }
5582 
5583 /* This lkb may have been recovered in a previous aborted recovery so we need
5584    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5585    If so we just send back a standard reply.  If not, we create a new lkb with
5586    the given values and send back our lkid.  We send back our lkid by sending
5587    back the rcom_lock struct we got but with the remid field filled in. */
5588 
5589 /* needs at least dlm_rcom + rcom_lock */
5590 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5591 			    __le32 *rl_remid, __le32 *rl_result)
5592 {
5593 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5594 	struct dlm_rsb *r;
5595 	struct dlm_lkb *lkb;
5596 	uint32_t remid = 0;
5597 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5598 	int error;
5599 
5600 	/* init rl_remid with rcom lock rl_remid */
5601 	*rl_remid = rl->rl_remid;
5602 
5603 	if (rl->rl_parent_lkid) {
5604 		error = -EOPNOTSUPP;
5605 		goto out;
5606 	}
5607 
5608 	remid = le32_to_cpu(rl->rl_lkid);
5609 
5610 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5611 	   have to require it.  Recovery of masters on one node can overlap
5612 	   recovery of locks on another node, so one node can send us MSTCPY
5613 	   locks before we've made ourselves master of this rsb.  We can still
5614 	   add new MSTCPY locks that we receive here without any harm; when
5615 	   we make ourselves master, dlm_recover_masters() won't touch the
5616 	   MSTCPY locks we've received early. */
5617 
5618 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5619 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5620 	if (error)
5621 		goto out;
5622 
5623 	lock_rsb(r);
5624 
5625 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5626 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5627 			  from_nodeid, remid);
5628 		error = -EBADR;
5629 		goto out_unlock;
5630 	}
5631 
5632 	lkb = search_remid(r, from_nodeid, remid);
5633 	if (lkb) {
5634 		error = -EEXIST;
5635 		goto out_remid;
5636 	}
5637 
5638 	error = create_lkb(ls, &lkb);
5639 	if (error)
5640 		goto out_unlock;
5641 
5642 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5643 	if (error) {
5644 		__put_lkb(ls, lkb);
5645 		goto out_unlock;
5646 	}
5647 
5648 	attach_lkb(r, lkb);
5649 	add_lkb(r, lkb, rl->rl_status);
5650 	ls->ls_recover_locks_in++;
5651 
5652 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5653 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5654 
5655  out_remid:
5656 	/* this is the new value returned to the lock holder for
5657 	   saving in its process-copy lkb */
5658 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5659 
5660 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5661 
5662  out_unlock:
5663 	unlock_rsb(r);
5664 	put_rsb(r);
5665  out:
5666 	if (error && error != -EEXIST)
5667 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5668 			  from_nodeid, remid, error);
5669 	*rl_result = cpu_to_le32(error);
5670 	return error;
5671 }
5672 
5673 /* needs at least dlm_rcom + rcom_lock */
5674 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5675 			     uint64_t seq)
5676 {
5677 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5678 	struct dlm_rsb *r;
5679 	struct dlm_lkb *lkb;
5680 	uint32_t lkid, remid;
5681 	int error, result;
5682 
5683 	lkid = le32_to_cpu(rl->rl_lkid);
5684 	remid = le32_to_cpu(rl->rl_remid);
5685 	result = le32_to_cpu(rl->rl_result);
5686 
5687 	error = find_lkb(ls, lkid, &lkb);
5688 	if (error) {
5689 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5690 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691 			  result);
5692 		return error;
5693 	}
5694 
5695 	r = lkb->lkb_resource;
5696 	hold_rsb(r);
5697 	lock_rsb(r);
5698 
5699 	if (!is_process_copy(lkb)) {
5700 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5701 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5702 			  result);
5703 		dlm_dump_rsb(r);
5704 		unlock_rsb(r);
5705 		put_rsb(r);
5706 		dlm_put_lkb(lkb);
5707 		return -EINVAL;
5708 	}
5709 
5710 	switch (result) {
5711 	case -EBADR:
5712 		/* There's a chance the new master received our lock before
5713 		   dlm_recover_master_reply(), this wouldn't happen if we did
5714 		   a barrier between recover_masters and recover_locks. */
5715 
5716 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5717 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 			  result);
5719 
5720 		dlm_send_rcom_lock(r, lkb, seq);
5721 		goto out;
5722 	case -EEXIST:
5723 	case 0:
5724 		lkb->lkb_remid = remid;
5725 		break;
5726 	default:
5727 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5728 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5729 			  result);
5730 	}
5731 
5732 	/* an ack for dlm_recover_locks() which waits for replies from
5733 	   all the locks it sends to new masters */
5734 	dlm_recovered_lock(r);
5735  out:
5736 	unlock_rsb(r);
5737 	put_rsb(r);
5738 	dlm_put_lkb(lkb);
5739 
5740 	return 0;
5741 }
5742 
5743 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5744 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5745 {
5746 	struct dlm_lkb *lkb;
5747 	struct dlm_args args;
5748 	bool do_put = true;
5749 	int error;
5750 
5751 	dlm_lock_recovery(ls);
5752 
5753 	error = create_lkb(ls, &lkb);
5754 	if (error) {
5755 		kfree(ua);
5756 		goto out;
5757 	}
5758 
5759 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5760 
5761 	if (flags & DLM_LKF_VALBLK) {
5762 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5763 		if (!ua->lksb.sb_lvbptr) {
5764 			kfree(ua);
5765 			error = -ENOMEM;
5766 			goto out_put;
5767 		}
5768 	}
5769 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5770 			      fake_bastfn, &args);
5771 	if (error) {
5772 		kfree(ua->lksb.sb_lvbptr);
5773 		ua->lksb.sb_lvbptr = NULL;
5774 		kfree(ua);
5775 		goto out_put;
5776 	}
5777 
5778 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5779 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5780 	   lock and that lkb_astparam is the dlm_user_args structure. */
5781 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5782 	error = request_lock(ls, lkb, name, namelen, &args);
5783 
5784 	switch (error) {
5785 	case 0:
5786 		break;
5787 	case -EINPROGRESS:
5788 		error = 0;
5789 		break;
5790 	case -EAGAIN:
5791 		error = 0;
5792 		fallthrough;
5793 	default:
5794 		goto out_put;
5795 	}
5796 
5797 	/* add this new lkb to the per-process list of locks */
5798 	spin_lock_bh(&ua->proc->locks_spin);
5799 	hold_lkb(lkb);
5800 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5801 	spin_unlock_bh(&ua->proc->locks_spin);
5802 	do_put = false;
5803  out_put:
5804 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5805 	if (do_put)
5806 		__put_lkb(ls, lkb);
5807  out:
5808 	dlm_unlock_recovery(ls);
5809 	return error;
5810 }
5811 
5812 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5813 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5814 {
5815 	struct dlm_lkb *lkb;
5816 	struct dlm_args args;
5817 	struct dlm_user_args *ua;
5818 	int error;
5819 
5820 	dlm_lock_recovery(ls);
5821 
5822 	error = find_lkb(ls, lkid, &lkb);
5823 	if (error)
5824 		goto out;
5825 
5826 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5827 
5828 	/* user can change the params on its lock when it converts it, or
5829 	   add an lvb that didn't exist before */
5830 
5831 	ua = lkb->lkb_ua;
5832 
5833 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5834 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5835 		if (!ua->lksb.sb_lvbptr) {
5836 			error = -ENOMEM;
5837 			goto out_put;
5838 		}
5839 	}
5840 	if (lvb_in && ua->lksb.sb_lvbptr)
5841 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5842 
5843 	ua->xid = ua_tmp->xid;
5844 	ua->castparam = ua_tmp->castparam;
5845 	ua->castaddr = ua_tmp->castaddr;
5846 	ua->bastparam = ua_tmp->bastparam;
5847 	ua->bastaddr = ua_tmp->bastaddr;
5848 	ua->user_lksb = ua_tmp->user_lksb;
5849 
5850 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5851 			      fake_bastfn, &args);
5852 	if (error)
5853 		goto out_put;
5854 
5855 	error = convert_lock(ls, lkb, &args);
5856 
5857 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5858 		error = 0;
5859  out_put:
5860 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5861 	dlm_put_lkb(lkb);
5862  out:
5863 	dlm_unlock_recovery(ls);
5864 	kfree(ua_tmp);
5865 	return error;
5866 }
5867 
5868 /*
5869  * The caller asks for an orphan lock on a given resource with a given mode.
5870  * If a matching lock exists, it's moved to the owner's list of locks and
5871  * the lkid is returned.
5872  */
5873 
5874 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5875 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5876 		     uint32_t *lkid)
5877 {
5878 	struct dlm_lkb *lkb = NULL, *iter;
5879 	struct dlm_user_args *ua;
5880 	int found_other_mode = 0;
5881 	int rv = 0;
5882 
5883 	spin_lock_bh(&ls->ls_orphans_lock);
5884 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5885 		if (iter->lkb_resource->res_length != namelen)
5886 			continue;
5887 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5888 			continue;
5889 		if (iter->lkb_grmode != mode) {
5890 			found_other_mode = 1;
5891 			continue;
5892 		}
5893 
5894 		lkb = iter;
5895 		list_del_init(&iter->lkb_ownqueue);
5896 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5897 		*lkid = iter->lkb_id;
5898 		break;
5899 	}
5900 	spin_unlock_bh(&ls->ls_orphans_lock);
5901 
5902 	if (!lkb && found_other_mode) {
5903 		rv = -EAGAIN;
5904 		goto out;
5905 	}
5906 
5907 	if (!lkb) {
5908 		rv = -ENOENT;
5909 		goto out;
5910 	}
5911 
5912 	lkb->lkb_exflags = flags;
5913 	lkb->lkb_ownpid = (int) current->pid;
5914 
5915 	ua = lkb->lkb_ua;
5916 
5917 	ua->proc = ua_tmp->proc;
5918 	ua->xid = ua_tmp->xid;
5919 	ua->castparam = ua_tmp->castparam;
5920 	ua->castaddr = ua_tmp->castaddr;
5921 	ua->bastparam = ua_tmp->bastparam;
5922 	ua->bastaddr = ua_tmp->bastaddr;
5923 	ua->user_lksb = ua_tmp->user_lksb;
5924 
5925 	/*
5926 	 * The lkb reference from the ls_orphans list was not
5927 	 * removed above, and is now considered the reference
5928 	 * for the proc locks list.
5929 	 */
5930 
5931 	spin_lock_bh(&ua->proc->locks_spin);
5932 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5933 	spin_unlock_bh(&ua->proc->locks_spin);
5934  out:
5935 	kfree(ua_tmp);
5936 	return rv;
5937 }
5938 
5939 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5940 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5941 {
5942 	struct dlm_lkb *lkb;
5943 	struct dlm_args args;
5944 	struct dlm_user_args *ua;
5945 	int error;
5946 
5947 	dlm_lock_recovery(ls);
5948 
5949 	error = find_lkb(ls, lkid, &lkb);
5950 	if (error)
5951 		goto out;
5952 
5953 	trace_dlm_unlock_start(ls, lkb, flags);
5954 
5955 	ua = lkb->lkb_ua;
5956 
5957 	if (lvb_in && ua->lksb.sb_lvbptr)
5958 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5959 	if (ua_tmp->castparam)
5960 		ua->castparam = ua_tmp->castparam;
5961 	ua->user_lksb = ua_tmp->user_lksb;
5962 
5963 	error = set_unlock_args(flags, ua, &args);
5964 	if (error)
5965 		goto out_put;
5966 
5967 	error = unlock_lock(ls, lkb, &args);
5968 
5969 	if (error == -DLM_EUNLOCK)
5970 		error = 0;
5971 	/* from validate_unlock_args() */
5972 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5973 		error = 0;
5974 	if (error)
5975 		goto out_put;
5976 
5977 	spin_lock_bh(&ua->proc->locks_spin);
5978 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5979 	if (!list_empty(&lkb->lkb_ownqueue))
5980 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5981 	spin_unlock_bh(&ua->proc->locks_spin);
5982  out_put:
5983 	trace_dlm_unlock_end(ls, lkb, flags, error);
5984 	dlm_put_lkb(lkb);
5985  out:
5986 	dlm_unlock_recovery(ls);
5987 	kfree(ua_tmp);
5988 	return error;
5989 }
5990 
5991 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5992 		    uint32_t flags, uint32_t lkid)
5993 {
5994 	struct dlm_lkb *lkb;
5995 	struct dlm_args args;
5996 	struct dlm_user_args *ua;
5997 	int error;
5998 
5999 	dlm_lock_recovery(ls);
6000 
6001 	error = find_lkb(ls, lkid, &lkb);
6002 	if (error)
6003 		goto out;
6004 
6005 	trace_dlm_unlock_start(ls, lkb, flags);
6006 
6007 	ua = lkb->lkb_ua;
6008 	if (ua_tmp->castparam)
6009 		ua->castparam = ua_tmp->castparam;
6010 	ua->user_lksb = ua_tmp->user_lksb;
6011 
6012 	error = set_unlock_args(flags, ua, &args);
6013 	if (error)
6014 		goto out_put;
6015 
6016 	error = cancel_lock(ls, lkb, &args);
6017 
6018 	if (error == -DLM_ECANCEL)
6019 		error = 0;
6020 	/* from validate_unlock_args() */
6021 	if (error == -EBUSY)
6022 		error = 0;
6023  out_put:
6024 	trace_dlm_unlock_end(ls, lkb, flags, error);
6025 	dlm_put_lkb(lkb);
6026  out:
6027 	dlm_unlock_recovery(ls);
6028 	kfree(ua_tmp);
6029 	return error;
6030 }
6031 
6032 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6033 {
6034 	struct dlm_lkb *lkb;
6035 	struct dlm_args args;
6036 	struct dlm_user_args *ua;
6037 	struct dlm_rsb *r;
6038 	int error;
6039 
6040 	dlm_lock_recovery(ls);
6041 
6042 	error = find_lkb(ls, lkid, &lkb);
6043 	if (error)
6044 		goto out;
6045 
6046 	trace_dlm_unlock_start(ls, lkb, flags);
6047 
6048 	ua = lkb->lkb_ua;
6049 
6050 	error = set_unlock_args(flags, ua, &args);
6051 	if (error)
6052 		goto out_put;
6053 
6054 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6055 
6056 	r = lkb->lkb_resource;
6057 	hold_rsb(r);
6058 	lock_rsb(r);
6059 
6060 	error = validate_unlock_args(lkb, &args);
6061 	if (error)
6062 		goto out_r;
6063 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6064 
6065 	error = _cancel_lock(r, lkb);
6066  out_r:
6067 	unlock_rsb(r);
6068 	put_rsb(r);
6069 
6070 	if (error == -DLM_ECANCEL)
6071 		error = 0;
6072 	/* from validate_unlock_args() */
6073 	if (error == -EBUSY)
6074 		error = 0;
6075  out_put:
6076 	trace_dlm_unlock_end(ls, lkb, flags, error);
6077 	dlm_put_lkb(lkb);
6078  out:
6079 	dlm_unlock_recovery(ls);
6080 	return error;
6081 }
6082 
6083 /* lkb's that are removed from the waiters list by revert are just left on the
6084    orphans list with the granted orphan locks, to be freed by purge */
6085 
6086 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6087 {
6088 	struct dlm_args args;
6089 	int error;
6090 
6091 	hold_lkb(lkb); /* reference for the ls_orphans list */
6092 	spin_lock_bh(&ls->ls_orphans_lock);
6093 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6094 	spin_unlock_bh(&ls->ls_orphans_lock);
6095 
6096 	set_unlock_args(0, lkb->lkb_ua, &args);
6097 
6098 	error = cancel_lock(ls, lkb, &args);
6099 	if (error == -DLM_ECANCEL)
6100 		error = 0;
6101 	return error;
6102 }
6103 
6104 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6105    granted.  Regardless of what rsb queue the lock is on, it's removed and
6106    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6107    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6108 
6109 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6110 {
6111 	struct dlm_args args;
6112 	int error;
6113 
6114 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6115 			lkb->lkb_ua, &args);
6116 
6117 	error = unlock_lock(ls, lkb, &args);
6118 	if (error == -DLM_EUNLOCK)
6119 		error = 0;
6120 	return error;
6121 }
6122 
6123 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6124    (which does lock_rsb) due to deadlock with receiving a message that does
6125    lock_rsb followed by dlm_user_add_cb() */
6126 
6127 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6128 				     struct dlm_user_proc *proc)
6129 {
6130 	struct dlm_lkb *lkb = NULL;
6131 
6132 	spin_lock_bh(&ls->ls_clear_proc_locks);
6133 	if (list_empty(&proc->locks))
6134 		goto out;
6135 
6136 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6137 	list_del_init(&lkb->lkb_ownqueue);
6138 
6139 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6140 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6141 	else
6142 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6143  out:
6144 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6145 	return lkb;
6146 }
6147 
6148 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6149    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6150    which we clear here. */
6151 
6152 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6153    list, and no more device_writes should add lkb's to proc->locks list; so we
6154    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6155    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6156    them ourself. */
6157 
6158 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6159 {
6160 	struct dlm_callback *cb, *cb_safe;
6161 	struct dlm_lkb *lkb, *safe;
6162 
6163 	dlm_lock_recovery(ls);
6164 
6165 	while (1) {
6166 		lkb = del_proc_lock(ls, proc);
6167 		if (!lkb)
6168 			break;
6169 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6170 			orphan_proc_lock(ls, lkb);
6171 		else
6172 			unlock_proc_lock(ls, lkb);
6173 
6174 		/* this removes the reference for the proc->locks list
6175 		   added by dlm_user_request, it may result in the lkb
6176 		   being freed */
6177 
6178 		dlm_put_lkb(lkb);
6179 	}
6180 
6181 	spin_lock_bh(&ls->ls_clear_proc_locks);
6182 
6183 	/* in-progress unlocks */
6184 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6185 		list_del_init(&lkb->lkb_ownqueue);
6186 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6187 		dlm_put_lkb(lkb);
6188 	}
6189 
6190 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6191 		list_del(&cb->list);
6192 		dlm_free_cb(cb);
6193 	}
6194 
6195 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6196 	dlm_unlock_recovery(ls);
6197 }
6198 
6199 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6200 {
6201 	struct dlm_callback *cb, *cb_safe;
6202 	struct dlm_lkb *lkb, *safe;
6203 
6204 	while (1) {
6205 		lkb = NULL;
6206 		spin_lock_bh(&proc->locks_spin);
6207 		if (!list_empty(&proc->locks)) {
6208 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6209 					 lkb_ownqueue);
6210 			list_del_init(&lkb->lkb_ownqueue);
6211 		}
6212 		spin_unlock_bh(&proc->locks_spin);
6213 
6214 		if (!lkb)
6215 			break;
6216 
6217 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6218 		unlock_proc_lock(ls, lkb);
6219 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6220 	}
6221 
6222 	spin_lock_bh(&proc->locks_spin);
6223 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6224 		list_del_init(&lkb->lkb_ownqueue);
6225 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6226 		dlm_put_lkb(lkb);
6227 	}
6228 	spin_unlock_bh(&proc->locks_spin);
6229 
6230 	spin_lock_bh(&proc->asts_spin);
6231 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6232 		list_del(&cb->list);
6233 		dlm_free_cb(cb);
6234 	}
6235 	spin_unlock_bh(&proc->asts_spin);
6236 }
6237 
6238 /* pid of 0 means purge all orphans */
6239 
6240 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6241 {
6242 	struct dlm_lkb *lkb, *safe;
6243 
6244 	spin_lock_bh(&ls->ls_orphans_lock);
6245 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6246 		if (pid && lkb->lkb_ownpid != pid)
6247 			continue;
6248 		unlock_proc_lock(ls, lkb);
6249 		list_del_init(&lkb->lkb_ownqueue);
6250 		dlm_put_lkb(lkb);
6251 	}
6252 	spin_unlock_bh(&ls->ls_orphans_lock);
6253 }
6254 
6255 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6256 {
6257 	struct dlm_message *ms;
6258 	struct dlm_mhandle *mh;
6259 	int error;
6260 
6261 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6262 				DLM_MSG_PURGE, &ms, &mh);
6263 	if (error)
6264 		return error;
6265 	ms->m_nodeid = cpu_to_le32(nodeid);
6266 	ms->m_pid = cpu_to_le32(pid);
6267 
6268 	return send_message(mh, ms, NULL, 0);
6269 }
6270 
6271 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6272 		   int nodeid, int pid)
6273 {
6274 	int error = 0;
6275 
6276 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6277 		error = send_purge(ls, nodeid, pid);
6278 	} else {
6279 		dlm_lock_recovery(ls);
6280 		if (pid == current->pid)
6281 			purge_proc_locks(ls, proc);
6282 		else
6283 			do_purge(ls, nodeid, pid);
6284 		dlm_unlock_recovery(ls);
6285 	}
6286 	return error;
6287 }
6288 
6289 /* debug functionality */
6290 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6291 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6292 {
6293 	struct dlm_lksb *lksb;
6294 	struct dlm_lkb *lkb;
6295 	struct dlm_rsb *r;
6296 	int error;
6297 
6298 	/* we currently can't set a valid user lock */
6299 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6300 		return -EOPNOTSUPP;
6301 
6302 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6303 	if (!lksb)
6304 		return -ENOMEM;
6305 
6306 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6307 	if (error) {
6308 		kfree(lksb);
6309 		return error;
6310 	}
6311 
6312 	dlm_set_dflags_val(lkb, lkb_dflags);
6313 	lkb->lkb_nodeid = lkb_nodeid;
6314 	lkb->lkb_lksb = lksb;
6315 	/* user specific pointer, just don't have it NULL for kernel locks */
6316 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6317 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6318 
6319 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6320 	if (error) {
6321 		kfree(lksb);
6322 		__put_lkb(ls, lkb);
6323 		return error;
6324 	}
6325 
6326 	lock_rsb(r);
6327 	attach_lkb(r, lkb);
6328 	add_lkb(r, lkb, lkb_status);
6329 	unlock_rsb(r);
6330 	put_rsb(r);
6331 
6332 	return 0;
6333 }
6334 
6335 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6336 				 int mstype, int to_nodeid)
6337 {
6338 	struct dlm_lkb *lkb;
6339 	int error;
6340 
6341 	error = find_lkb(ls, lkb_id, &lkb);
6342 	if (error)
6343 		return error;
6344 
6345 	error = add_to_waiters(lkb, mstype, to_nodeid);
6346 	dlm_put_lkb(lkb);
6347 	return error;
6348 }
6349 
6350