xref: /linux/fs/dlm/lock.c (revision 17e548405a81665fd14cee960db7d093d1396400)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 	if (error)
738 		goto do_new;
739 
740 	/* check if the rsb is active under read lock - likely path */
741 	read_lock_bh(&ls->ls_rsbtbl_lock);
742 	if (!rsb_flag(r, RSB_HASHED)) {
743 		read_unlock_bh(&ls->ls_rsbtbl_lock);
744 		error = -EBADR;
745 		goto do_new;
746 	}
747 
748 	/*
749 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
750 	 */
751 
752 	if (rsb_flag(r, RSB_INACTIVE)) {
753 		read_unlock_bh(&ls->ls_rsbtbl_lock);
754 		goto do_inactive;
755 	}
756 
757 	kref_get(&r->res_ref);
758 	read_unlock_bh(&ls->ls_rsbtbl_lock);
759 	goto out;
760 
761 
762  do_inactive:
763 	write_lock_bh(&ls->ls_rsbtbl_lock);
764 
765 	/*
766 	 * The expectation here is that the rsb will have HASHED and
767 	 * INACTIVE flags set, and that the rsb can be moved from
768 	 * inactive back to active again.  However, between releasing
769 	 * the read lock and acquiring the write lock, this rsb could
770 	 * have been removed from rsbtbl, and had HASHED cleared, to
771 	 * be freed.  To deal with this case, we would normally need
772 	 * to repeat dlm_search_rsb_tree while holding the write lock,
773 	 * but rcu allows us to simply check the HASHED flag, because
774 	 * the rcu read lock means the rsb will not be freed yet.
775 	 * If the HASHED flag is not set, then the rsb is being freed,
776 	 * so we add a new rsb struct.  If the HASHED flag is set,
777 	 * and INACTIVE is not set, it means another thread has
778 	 * made the rsb active, as we're expecting to do here, and
779 	 * we just repeat the lookup (this will be very unlikely.)
780 	 */
781 	if (rsb_flag(r, RSB_HASHED)) {
782 		if (!rsb_flag(r, RSB_INACTIVE)) {
783 			write_unlock_bh(&ls->ls_rsbtbl_lock);
784 			goto retry;
785 		}
786 	} else {
787 		write_unlock_bh(&ls->ls_rsbtbl_lock);
788 		error = -EBADR;
789 		goto do_new;
790 	}
791 
792 	/*
793 	 * rsb found inactive (master_nodeid may be out of date unless
794 	 * we are the dir_nodeid or were the master)  No other thread
795 	 * is using this rsb because it's inactive, so we can
796 	 * look at or update res_master_nodeid without lock_rsb.
797 	 */
798 
799 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
800 		/* our rsb was not master, and another node (not the dir node)
801 		   has sent us a request */
802 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
803 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
804 			  r->res_name);
805 		write_unlock_bh(&ls->ls_rsbtbl_lock);
806 		error = -ENOTBLK;
807 		goto out;
808 	}
809 
810 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
811 		/* don't think this should ever happen */
812 		log_error(ls, "find_rsb inactive from_dir %d master %d",
813 			  from_nodeid, r->res_master_nodeid);
814 		dlm_print_rsb(r);
815 		/* fix it and go on */
816 		r->res_master_nodeid = our_nodeid;
817 		r->res_nodeid = 0;
818 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
819 		r->res_first_lkid = 0;
820 	}
821 
822 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
823 		/* Because we have held no locks on this rsb,
824 		   res_master_nodeid could have become stale. */
825 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
826 		r->res_first_lkid = 0;
827 	}
828 
829 	/* we always deactivate scan timer for the rsb, when
830 	 * we move it out of the inactive state as rsb state
831 	 * can be changed and scan timers are only for inactive
832 	 * rsbs.
833 	 */
834 	del_scan(ls, r);
835 	list_move(&r->res_slow_list, &ls->ls_slow_active);
836 	rsb_clear_flag(r, RSB_INACTIVE);
837 	kref_init(&r->res_ref); /* ref is now used in active state */
838 	write_unlock_bh(&ls->ls_rsbtbl_lock);
839 
840 	goto out;
841 
842 
843  do_new:
844 	/*
845 	 * rsb not found
846 	 */
847 
848 	if (error == -EBADR && !create)
849 		goto out;
850 
851 	error = get_rsb_struct(ls, name, len, &r);
852 	if (WARN_ON_ONCE(error))
853 		goto out;
854 
855 	r->res_hash = hash;
856 	r->res_dir_nodeid = dir_nodeid;
857 	kref_init(&r->res_ref);
858 
859 	if (from_dir) {
860 		/* want to see how often this happens */
861 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
862 			  from_nodeid, r->res_name);
863 		r->res_master_nodeid = our_nodeid;
864 		r->res_nodeid = 0;
865 		goto out_add;
866 	}
867 
868 	if (from_other && (dir_nodeid != our_nodeid)) {
869 		/* should never happen */
870 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
871 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
872 		dlm_free_rsb(r);
873 		r = NULL;
874 		error = -ENOTBLK;
875 		goto out;
876 	}
877 
878 	if (from_other) {
879 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
880 			  from_nodeid, dir_nodeid, r->res_name);
881 	}
882 
883 	if (dir_nodeid == our_nodeid) {
884 		/* When we are the dir nodeid, we can set the master
885 		   node immediately */
886 		r->res_master_nodeid = our_nodeid;
887 		r->res_nodeid = 0;
888 	} else {
889 		/* set_master will send_lookup to dir_nodeid */
890 		r->res_master_nodeid = 0;
891 		r->res_nodeid = -1;
892 	}
893 
894  out_add:
895 
896 	write_lock_bh(&ls->ls_rsbtbl_lock);
897 	error = rsb_insert(r, &ls->ls_rsbtbl);
898 	if (error == -EEXIST) {
899 		/* somebody else was faster and it seems the
900 		 * rsb exists now, we do a whole relookup
901 		 */
902 		write_unlock_bh(&ls->ls_rsbtbl_lock);
903 		dlm_free_rsb(r);
904 		goto retry;
905 	} else if (!error) {
906 		list_add(&r->res_slow_list, &ls->ls_slow_active);
907 	}
908 	write_unlock_bh(&ls->ls_rsbtbl_lock);
909  out:
910 	*r_ret = r;
911 	return error;
912 }
913 
914 /* During recovery, other nodes can send us new MSTCPY locks (from
915    dlm_recover_locks) before we've made ourself master (in
916    dlm_recover_masters). */
917 
918 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
919 			  uint32_t hash, int dir_nodeid, int from_nodeid,
920 			  unsigned int flags, struct dlm_rsb **r_ret)
921 {
922 	struct dlm_rsb *r = NULL;
923 	int our_nodeid = dlm_our_nodeid();
924 	int recover = (flags & R_RECEIVE_RECOVER);
925 	int error;
926 
927  retry:
928 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
929 	if (error)
930 		goto do_new;
931 
932 	/* check if the rsb is in active state under read lock - likely path */
933 	read_lock_bh(&ls->ls_rsbtbl_lock);
934 	if (!rsb_flag(r, RSB_HASHED)) {
935 		read_unlock_bh(&ls->ls_rsbtbl_lock);
936 		goto do_new;
937 	}
938 
939 	if (rsb_flag(r, RSB_INACTIVE)) {
940 		read_unlock_bh(&ls->ls_rsbtbl_lock);
941 		goto do_inactive;
942 	}
943 
944 	/*
945 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
946 	 */
947 
948 	kref_get(&r->res_ref);
949 	read_unlock_bh(&ls->ls_rsbtbl_lock);
950 
951 	goto out;
952 
953 
954  do_inactive:
955 	write_lock_bh(&ls->ls_rsbtbl_lock);
956 
957 	/* See comment in find_rsb_dir. */
958 	if (rsb_flag(r, RSB_HASHED)) {
959 		if (!rsb_flag(r, RSB_INACTIVE)) {
960 			write_unlock_bh(&ls->ls_rsbtbl_lock);
961 			goto retry;
962 		}
963 	} else {
964 		write_unlock_bh(&ls->ls_rsbtbl_lock);
965 		goto do_new;
966 	}
967 
968 
969 	/*
970 	 * rsb found inactive. No other thread is using this rsb because
971 	 * it's inactive, so we can look at or update res_master_nodeid
972 	 * without lock_rsb.
973 	 */
974 
975 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
976 		/* our rsb is not master, and another node has sent us a
977 		   request; this should never happen */
978 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
979 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
980 		dlm_print_rsb(r);
981 		write_unlock_bh(&ls->ls_rsbtbl_lock);
982 		error = -ENOTBLK;
983 		goto out;
984 	}
985 
986 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
987 	    (dir_nodeid == our_nodeid)) {
988 		/* our rsb is not master, and we are dir; may as well fix it;
989 		   this should never happen */
990 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
991 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
992 		dlm_print_rsb(r);
993 		r->res_master_nodeid = our_nodeid;
994 		r->res_nodeid = 0;
995 	}
996 
997 	del_scan(ls, r);
998 	list_move(&r->res_slow_list, &ls->ls_slow_active);
999 	rsb_clear_flag(r, RSB_INACTIVE);
1000 	kref_init(&r->res_ref);
1001 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1002 
1003 	goto out;
1004 
1005 
1006  do_new:
1007 	/*
1008 	 * rsb not found
1009 	 */
1010 
1011 	error = get_rsb_struct(ls, name, len, &r);
1012 	if (WARN_ON_ONCE(error))
1013 		goto out;
1014 
1015 	r->res_hash = hash;
1016 	r->res_dir_nodeid = dir_nodeid;
1017 	r->res_master_nodeid = dir_nodeid;
1018 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1019 	kref_init(&r->res_ref);
1020 
1021 	write_lock_bh(&ls->ls_rsbtbl_lock);
1022 	error = rsb_insert(r, &ls->ls_rsbtbl);
1023 	if (error == -EEXIST) {
1024 		/* somebody else was faster and it seems the
1025 		 * rsb exists now, we do a whole relookup
1026 		 */
1027 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1028 		dlm_free_rsb(r);
1029 		goto retry;
1030 	} else if (!error) {
1031 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1032 	}
1033 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1034 
1035  out:
1036 	*r_ret = r;
1037 	return error;
1038 }
1039 
1040 /*
1041  * rsb rcu usage
1042  *
1043  * While rcu read lock is held, the rsb cannot be freed,
1044  * which allows a lookup optimization.
1045  *
1046  * Two threads are accessing the same rsb concurrently,
1047  * the first (A) is trying to use the rsb, the second (B)
1048  * is trying to free the rsb.
1049  *
1050  * thread A                 thread B
1051  * (trying to use rsb)      (trying to free rsb)
1052  *
1053  * A1. rcu read lock
1054  * A2. rsbtbl read lock
1055  * A3. look up rsb in rsbtbl
1056  * A4. rsbtbl read unlock
1057  *                          B1. rsbtbl write lock
1058  *                          B2. look up rsb in rsbtbl
1059  *                          B3. remove rsb from rsbtbl
1060  *                          B4. clear rsb HASHED flag
1061  *                          B5. rsbtbl write unlock
1062  *                          B6. begin freeing rsb using rcu...
1063  *
1064  * (rsb is inactive, so try to make it active again)
1065  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1066  * A6. the rsb HASHED flag is not set, which it means the rsb
1067  *     is being removed from rsbtbl and freed, so don't use it.
1068  * A7. rcu read unlock
1069  *
1070  *                          B7. ...finish freeing rsb using rcu
1071  * A8. create a new rsb
1072  *
1073  * Without the rcu optimization, steps A5-8 would need to do
1074  * an extra rsbtbl lookup:
1075  * A5. rsbtbl write lock
1076  * A6. look up rsb in rsbtbl, not found
1077  * A7. rsbtbl write unlock
1078  * A8. create a new rsb
1079  */
1080 
1081 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1082 		    int from_nodeid, unsigned int flags,
1083 		    struct dlm_rsb **r_ret)
1084 {
1085 	int dir_nodeid;
1086 	uint32_t hash;
1087 	int rv;
1088 
1089 	if (len > DLM_RESNAME_MAXLEN)
1090 		return -EINVAL;
1091 
1092 	hash = jhash(name, len, 0);
1093 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1094 
1095 	rcu_read_lock();
1096 	if (dlm_no_directory(ls))
1097 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1098 				      from_nodeid, flags, r_ret);
1099 	else
1100 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1101 				    from_nodeid, flags, r_ret);
1102 	rcu_read_unlock();
1103 	return rv;
1104 }
1105 
1106 /* we have received a request and found that res_master_nodeid != our_nodeid,
1107    so we need to return an error or make ourself the master */
1108 
1109 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1110 				  int from_nodeid)
1111 {
1112 	if (dlm_no_directory(ls)) {
1113 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1114 			  from_nodeid, r->res_master_nodeid,
1115 			  r->res_dir_nodeid);
1116 		dlm_print_rsb(r);
1117 		return -ENOTBLK;
1118 	}
1119 
1120 	if (from_nodeid != r->res_dir_nodeid) {
1121 		/* our rsb is not master, and another node (not the dir node)
1122 	   	   has sent us a request.  this is much more common when our
1123 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1124 
1125 		if (r->res_master_nodeid) {
1126 			log_debug(ls, "validate master from_other %d master %d "
1127 				  "dir %d first %x %s", from_nodeid,
1128 				  r->res_master_nodeid, r->res_dir_nodeid,
1129 				  r->res_first_lkid, r->res_name);
1130 		}
1131 		return -ENOTBLK;
1132 	} else {
1133 		/* our rsb is not master, but the dir nodeid has sent us a
1134 	   	   request; this could happen with master 0 / res_nodeid -1 */
1135 
1136 		if (r->res_master_nodeid) {
1137 			log_error(ls, "validate master from_dir %d master %d "
1138 				  "first %x %s",
1139 				  from_nodeid, r->res_master_nodeid,
1140 				  r->res_first_lkid, r->res_name);
1141 		}
1142 
1143 		r->res_master_nodeid = dlm_our_nodeid();
1144 		r->res_nodeid = 0;
1145 		return 0;
1146 	}
1147 }
1148 
1149 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1150 				int from_nodeid, bool is_inactive, unsigned int flags,
1151 				int *r_nodeid, int *result)
1152 {
1153 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1154 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1155 
1156 	if (r->res_dir_nodeid != our_nodeid) {
1157 		/* should not happen, but may as well fix it and carry on */
1158 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1159 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1160 		r->res_dir_nodeid = our_nodeid;
1161 	}
1162 
1163 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1164 		/* Recovery uses this function to set a new master when
1165 		 * the previous master failed.  Setting NEW_MASTER will
1166 		 * force dlm_recover_masters to call recover_master on this
1167 		 * rsb even though the res_nodeid is no longer removed.
1168 		 */
1169 
1170 		r->res_master_nodeid = from_nodeid;
1171 		r->res_nodeid = from_nodeid;
1172 		rsb_set_flag(r, RSB_NEW_MASTER);
1173 
1174 		if (is_inactive) {
1175 			/* I don't think we should ever find it inactive. */
1176 			log_error(ls, "%s fix_master inactive", __func__);
1177 			dlm_dump_rsb(r);
1178 		}
1179 	}
1180 
1181 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1182 		/* this will happen if from_nodeid became master during
1183 		 * a previous recovery cycle, and we aborted the previous
1184 		 * cycle before recovering this master value
1185 		 */
1186 
1187 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1188 			  __func__, from_nodeid, r->res_master_nodeid,
1189 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1190 
1191 		if (r->res_master_nodeid == our_nodeid) {
1192 			log_error(ls, "from_master %d our_master", from_nodeid);
1193 			dlm_dump_rsb(r);
1194 			goto ret_assign;
1195 		}
1196 
1197 		r->res_master_nodeid = from_nodeid;
1198 		r->res_nodeid = from_nodeid;
1199 		rsb_set_flag(r, RSB_NEW_MASTER);
1200 	}
1201 
1202 	if (!r->res_master_nodeid) {
1203 		/* this will happen if recovery happens while we're looking
1204 		 * up the master for this rsb
1205 		 */
1206 
1207 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1208 			  from_nodeid, r->res_first_lkid, r->res_name);
1209 		r->res_master_nodeid = from_nodeid;
1210 		r->res_nodeid = from_nodeid;
1211 	}
1212 
1213 	if (!from_master && !fix_master &&
1214 	    (r->res_master_nodeid == from_nodeid)) {
1215 		/* this can happen when the master sends remove, the dir node
1216 		 * finds the rsb on the active list and ignores the remove,
1217 		 * and the former master sends a lookup
1218 		 */
1219 
1220 		log_limit(ls, "%s from master %d flags %x first %x %s",
1221 			  __func__, from_nodeid, flags, r->res_first_lkid,
1222 			  r->res_name);
1223 	}
1224 
1225  ret_assign:
1226 	*r_nodeid = r->res_master_nodeid;
1227 	if (result)
1228 		*result = DLM_LU_MATCH;
1229 }
1230 
1231 /*
1232  * We're the dir node for this res and another node wants to know the
1233  * master nodeid.  During normal operation (non recovery) this is only
1234  * called from receive_lookup(); master lookups when the local node is
1235  * the dir node are done by find_rsb().
1236  *
1237  * normal operation, we are the dir node for a resource
1238  * . _request_lock
1239  * . set_master
1240  * . send_lookup
1241  * . receive_lookup
1242  * . dlm_master_lookup flags 0
1243  *
1244  * recover directory, we are rebuilding dir for all resources
1245  * . dlm_recover_directory
1246  * . dlm_rcom_names
1247  *   remote node sends back the rsb names it is master of and we are dir of
1248  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1249  *   we either create new rsb setting remote node as master, or find existing
1250  *   rsb and set master to be the remote node.
1251  *
1252  * recover masters, we are finding the new master for resources
1253  * . dlm_recover_masters
1254  * . recover_master
1255  * . dlm_send_rcom_lookup
1256  * . receive_rcom_lookup
1257  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1258  */
1259 
1260 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1261 			      int len, unsigned int flags, int *r_nodeid, int *result)
1262 {
1263 	struct dlm_rsb *r = NULL;
1264 	uint32_t hash;
1265 	int our_nodeid = dlm_our_nodeid();
1266 	int dir_nodeid, error;
1267 
1268 	if (len > DLM_RESNAME_MAXLEN)
1269 		return -EINVAL;
1270 
1271 	if (from_nodeid == our_nodeid) {
1272 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1273 			  our_nodeid, flags);
1274 		return -EINVAL;
1275 	}
1276 
1277 	hash = jhash(name, len, 0);
1278 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1279 	if (dir_nodeid != our_nodeid) {
1280 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1281 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1282 			  ls->ls_num_nodes);
1283 		*r_nodeid = -1;
1284 		return -EINVAL;
1285 	}
1286 
1287  retry:
1288 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1289 	if (error)
1290 		goto not_found;
1291 
1292 	/* check if the rsb is active under read lock - likely path */
1293 	read_lock_bh(&ls->ls_rsbtbl_lock);
1294 	if (!rsb_flag(r, RSB_HASHED)) {
1295 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 		goto not_found;
1297 	}
1298 
1299 	if (rsb_flag(r, RSB_INACTIVE)) {
1300 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1301 		goto do_inactive;
1302 	}
1303 
1304 	/* because the rsb is active, we need to lock_rsb before
1305 	 * checking/changing re_master_nodeid
1306 	 */
1307 
1308 	hold_rsb(r);
1309 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1310 	lock_rsb(r);
1311 
1312 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1313 			    flags, r_nodeid, result);
1314 
1315 	/* the rsb was active */
1316 	unlock_rsb(r);
1317 	put_rsb(r);
1318 
1319 	return 0;
1320 
1321  do_inactive:
1322 	/* unlikely path - check if still part of ls_rsbtbl */
1323 	write_lock_bh(&ls->ls_rsbtbl_lock);
1324 
1325 	/* see comment in find_rsb_dir */
1326 	if (rsb_flag(r, RSB_HASHED)) {
1327 		if (!rsb_flag(r, RSB_INACTIVE)) {
1328 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1329 			/* something as changed, very unlikely but
1330 			 * try again
1331 			 */
1332 			goto retry;
1333 		}
1334 	} else {
1335 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1336 		goto not_found;
1337 	}
1338 
1339 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1340 	   is not used, but is protected by the rsbtbl lock */
1341 
1342 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1343 			    r_nodeid, result);
1344 
1345 	/* A dir record rsb should never be on scan list.
1346 	 * Except when we are the dir and master node.
1347 	 * This function should only be called by the dir
1348 	 * node.
1349 	 */
1350 	WARN_ON(!list_empty(&r->res_scan_list) &&
1351 		r->res_master_nodeid != our_nodeid);
1352 
1353 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1354 
1355 	return 0;
1356 
1357  not_found:
1358 	error = get_rsb_struct(ls, name, len, &r);
1359 	if (WARN_ON_ONCE(error))
1360 		goto out;
1361 
1362 	r->res_hash = hash;
1363 	r->res_dir_nodeid = our_nodeid;
1364 	r->res_master_nodeid = from_nodeid;
1365 	r->res_nodeid = from_nodeid;
1366 	rsb_set_flag(r, RSB_INACTIVE);
1367 
1368 	write_lock_bh(&ls->ls_rsbtbl_lock);
1369 	error = rsb_insert(r, &ls->ls_rsbtbl);
1370 	if (error == -EEXIST) {
1371 		/* somebody else was faster and it seems the
1372 		 * rsb exists now, we do a whole relookup
1373 		 */
1374 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1375 		dlm_free_rsb(r);
1376 		goto retry;
1377 	} else if (error) {
1378 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1379 		/* should never happen */
1380 		dlm_free_rsb(r);
1381 		goto retry;
1382 	}
1383 
1384 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1385 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1386 
1387 	if (result)
1388 		*result = DLM_LU_ADD;
1389 	*r_nodeid = from_nodeid;
1390  out:
1391 	return error;
1392 }
1393 
1394 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1395 		      int len, unsigned int flags, int *r_nodeid, int *result)
1396 {
1397 	int rv;
1398 	rcu_read_lock();
1399 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1400 	rcu_read_unlock();
1401 	return rv;
1402 }
1403 
1404 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1405 {
1406 	struct dlm_rsb *r;
1407 
1408 	read_lock_bh(&ls->ls_rsbtbl_lock);
1409 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1410 		if (r->res_hash == hash)
1411 			dlm_dump_rsb(r);
1412 	}
1413 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1414 }
1415 
1416 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1417 {
1418 	struct dlm_rsb *r = NULL;
1419 	int error;
1420 
1421 	rcu_read_lock();
1422 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1423 	if (!error)
1424 		goto out;
1425 
1426 	dlm_dump_rsb(r);
1427  out:
1428 	rcu_read_unlock();
1429 }
1430 
1431 static void deactivate_rsb(struct kref *kref)
1432 {
1433 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1434 	struct dlm_ls *ls = r->res_ls;
1435 	int our_nodeid = dlm_our_nodeid();
1436 
1437 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1438 	rsb_set_flag(r, RSB_INACTIVE);
1439 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1440 
1441 	/*
1442 	 * When the rsb becomes unused, there are two possibilities:
1443 	 * 1. Leave the inactive rsb in place (don't remove it).
1444 	 * 2. Add it to the scan list to be removed.
1445 	 *
1446 	 * 1 is done when the rsb is acting as the dir record
1447 	 * for a remotely mastered rsb.  The rsb must be left
1448 	 * in place as an inactive rsb to act as the dir record.
1449 	 *
1450 	 * 2 is done when a) the rsb is not the master and not the
1451 	 * dir record, b) when the rsb is both the master and the
1452 	 * dir record, c) when the rsb is master but not dir record.
1453 	 *
1454 	 * (If no directory is used, the rsb can always be removed.)
1455 	 */
1456 	if (dlm_no_directory(ls) ||
1457 	    (r->res_master_nodeid == our_nodeid ||
1458 	     dlm_dir_nodeid(r) != our_nodeid))
1459 		add_scan(ls, r);
1460 
1461 	if (r->res_lvbptr) {
1462 		dlm_free_lvb(r->res_lvbptr);
1463 		r->res_lvbptr = NULL;
1464 	}
1465 }
1466 
1467 void free_inactive_rsb(struct dlm_rsb *r)
1468 {
1469 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1470 
1471 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1472 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1473 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1474 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1475 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1476 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1477 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1478 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1479 
1480 	dlm_free_rsb(r);
1481 }
1482 
1483 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1484    The rsb must exist as long as any lkb's for it do. */
1485 
1486 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1487 {
1488 	hold_rsb(r);
1489 	lkb->lkb_resource = r;
1490 }
1491 
1492 static void detach_lkb(struct dlm_lkb *lkb)
1493 {
1494 	if (lkb->lkb_resource) {
1495 		put_rsb(lkb->lkb_resource);
1496 		lkb->lkb_resource = NULL;
1497 	}
1498 }
1499 
1500 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1501 		       unsigned long start, unsigned long end)
1502 {
1503 	struct xa_limit limit;
1504 	struct dlm_lkb *lkb;
1505 	int rv;
1506 
1507 	limit.max = end;
1508 	limit.min = start;
1509 
1510 	lkb = dlm_allocate_lkb();
1511 	if (!lkb)
1512 		return -ENOMEM;
1513 
1514 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1515 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1516 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1517 	lkb->lkb_nodeid = -1;
1518 	lkb->lkb_grmode = DLM_LOCK_IV;
1519 	kref_init(&lkb->lkb_ref);
1520 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1521 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1522 
1523 	write_lock_bh(&ls->ls_lkbxa_lock);
1524 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1525 	write_unlock_bh(&ls->ls_lkbxa_lock);
1526 
1527 	if (rv < 0) {
1528 		log_error(ls, "create_lkb xa error %d", rv);
1529 		dlm_free_lkb(lkb);
1530 		return rv;
1531 	}
1532 
1533 	*lkb_ret = lkb;
1534 	return 0;
1535 }
1536 
1537 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1538 {
1539 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1540 }
1541 
1542 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1543 {
1544 	struct dlm_lkb *lkb;
1545 
1546 	rcu_read_lock();
1547 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1548 	if (lkb) {
1549 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1550 		 * the lkb_ref is tight to the lkbxa data structure, see
1551 		 * __put_lkb().
1552 		 */
1553 		read_lock_bh(&ls->ls_lkbxa_lock);
1554 		if (kref_read(&lkb->lkb_ref))
1555 			kref_get(&lkb->lkb_ref);
1556 		else
1557 			lkb = NULL;
1558 		read_unlock_bh(&ls->ls_lkbxa_lock);
1559 	}
1560 	rcu_read_unlock();
1561 
1562 	*lkb_ret = lkb;
1563 	return lkb ? 0 : -ENOENT;
1564 }
1565 
1566 static void kill_lkb(struct kref *kref)
1567 {
1568 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1569 
1570 	/* All work is done after the return from kref_put() so we
1571 	   can release the write_lock before the detach_lkb */
1572 
1573 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1574 }
1575 
1576 /* __put_lkb() is used when an lkb may not have an rsb attached to
1577    it so we need to provide the lockspace explicitly */
1578 
1579 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1580 {
1581 	uint32_t lkid = lkb->lkb_id;
1582 	int rv;
1583 
1584 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1585 					&ls->ls_lkbxa_lock);
1586 	if (rv) {
1587 		xa_erase(&ls->ls_lkbxa, lkid);
1588 		write_unlock_bh(&ls->ls_lkbxa_lock);
1589 
1590 		detach_lkb(lkb);
1591 
1592 		/* for local/process lkbs, lvbptr points to caller's lksb */
1593 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1594 			dlm_free_lvb(lkb->lkb_lvbptr);
1595 		dlm_free_lkb(lkb);
1596 	}
1597 
1598 	return rv;
1599 }
1600 
1601 int dlm_put_lkb(struct dlm_lkb *lkb)
1602 {
1603 	struct dlm_ls *ls;
1604 
1605 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1606 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1607 
1608 	ls = lkb->lkb_resource->res_ls;
1609 	return __put_lkb(ls, lkb);
1610 }
1611 
1612 /* This is only called to add a reference when the code already holds
1613    a valid reference to the lkb, so there's no need for locking. */
1614 
1615 static inline void hold_lkb(struct dlm_lkb *lkb)
1616 {
1617 	kref_get(&lkb->lkb_ref);
1618 }
1619 
1620 static void unhold_lkb_assert(struct kref *kref)
1621 {
1622 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1623 
1624 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1625 }
1626 
1627 /* This is called when we need to remove a reference and are certain
1628    it's not the last ref.  e.g. del_lkb is always called between a
1629    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1630    put_lkb would work fine, but would involve unnecessary locking */
1631 
1632 static inline void unhold_lkb(struct dlm_lkb *lkb)
1633 {
1634 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1635 }
1636 
1637 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1638 			    int mode)
1639 {
1640 	struct dlm_lkb *lkb = NULL, *iter;
1641 
1642 	list_for_each_entry(iter, head, lkb_statequeue)
1643 		if (iter->lkb_rqmode < mode) {
1644 			lkb = iter;
1645 			list_add_tail(new, &iter->lkb_statequeue);
1646 			break;
1647 		}
1648 
1649 	if (!lkb)
1650 		list_add_tail(new, head);
1651 }
1652 
1653 /* add/remove lkb to rsb's grant/convert/wait queue */
1654 
1655 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1656 {
1657 	kref_get(&lkb->lkb_ref);
1658 
1659 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1660 
1661 	lkb->lkb_timestamp = ktime_get();
1662 
1663 	lkb->lkb_status = status;
1664 
1665 	switch (status) {
1666 	case DLM_LKSTS_WAITING:
1667 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1668 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1669 		else
1670 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1671 		break;
1672 	case DLM_LKSTS_GRANTED:
1673 		/* convention says granted locks kept in order of grmode */
1674 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1675 				lkb->lkb_grmode);
1676 		break;
1677 	case DLM_LKSTS_CONVERT:
1678 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1679 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1680 		else
1681 			list_add_tail(&lkb->lkb_statequeue,
1682 				      &r->res_convertqueue);
1683 		break;
1684 	default:
1685 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1686 	}
1687 }
1688 
1689 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1690 {
1691 	lkb->lkb_status = 0;
1692 	list_del(&lkb->lkb_statequeue);
1693 	unhold_lkb(lkb);
1694 }
1695 
1696 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1697 {
1698 	del_lkb(r, lkb);
1699 	add_lkb(r, lkb, sts);
1700 }
1701 
1702 static int msg_reply_type(int mstype)
1703 {
1704 	switch (mstype) {
1705 	case DLM_MSG_REQUEST:
1706 		return DLM_MSG_REQUEST_REPLY;
1707 	case DLM_MSG_CONVERT:
1708 		return DLM_MSG_CONVERT_REPLY;
1709 	case DLM_MSG_UNLOCK:
1710 		return DLM_MSG_UNLOCK_REPLY;
1711 	case DLM_MSG_CANCEL:
1712 		return DLM_MSG_CANCEL_REPLY;
1713 	case DLM_MSG_LOOKUP:
1714 		return DLM_MSG_LOOKUP_REPLY;
1715 	}
1716 	return -1;
1717 }
1718 
1719 /* add/remove lkb from global waiters list of lkb's waiting for
1720    a reply from a remote node */
1721 
1722 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1723 {
1724 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1725 
1726 	spin_lock_bh(&ls->ls_waiters_lock);
1727 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1728 		switch (mstype) {
1729 		case DLM_MSG_UNLOCK:
1730 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1731 			break;
1732 		case DLM_MSG_CANCEL:
1733 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1734 			break;
1735 		default:
1736 			/* should never happen as validate_lock_args() checks
1737 			 * on lkb_wait_type and validate_unlock_args() only
1738 			 * creates UNLOCK or CANCEL messages.
1739 			 */
1740 			WARN_ON_ONCE(1);
1741 			goto out;
1742 		}
1743 		lkb->lkb_wait_count++;
1744 		hold_lkb(lkb);
1745 
1746 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749 		goto out;
1750 	}
1751 
1752 	DLM_ASSERT(!lkb->lkb_wait_count,
1753 		   dlm_print_lkb(lkb);
1754 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1755 
1756 	lkb->lkb_wait_count++;
1757 	lkb->lkb_wait_type = mstype;
1758 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759 	hold_lkb(lkb);
1760 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761  out:
1762 	spin_unlock_bh(&ls->ls_waiters_lock);
1763 }
1764 
1765 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1766    list as part of process_requestqueue (e.g. a lookup that has an optimized
1767    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1768    set RESEND and dlm_recover_waiters_post() */
1769 
1770 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1771 				const struct dlm_message *ms)
1772 {
1773 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1774 	int overlap_done = 0;
1775 
1776 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1777 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1778 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1779 		overlap_done = 1;
1780 		goto out_del;
1781 	}
1782 
1783 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1784 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1785 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1786 		overlap_done = 1;
1787 		goto out_del;
1788 	}
1789 
1790 	/* Cancel state was preemptively cleared by a successful convert,
1791 	   see next comment, nothing to do. */
1792 
1793 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1794 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1795 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1796 			  lkb->lkb_id, lkb->lkb_wait_type);
1797 		return -1;
1798 	}
1799 
1800 	/* Remove for the convert reply, and premptively remove for the
1801 	   cancel reply.  A convert has been granted while there's still
1802 	   an outstanding cancel on it (the cancel is moot and the result
1803 	   in the cancel reply should be 0).  We preempt the cancel reply
1804 	   because the app gets the convert result and then can follow up
1805 	   with another op, like convert.  This subsequent op would see the
1806 	   lingering state of the cancel and fail with -EBUSY. */
1807 
1808 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1809 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1810 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1811 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1812 			  lkb->lkb_id);
1813 		lkb->lkb_wait_type = 0;
1814 		lkb->lkb_wait_count--;
1815 		unhold_lkb(lkb);
1816 		goto out_del;
1817 	}
1818 
1819 	/* N.B. type of reply may not always correspond to type of original
1820 	   msg due to lookup->request optimization, verify others? */
1821 
1822 	if (lkb->lkb_wait_type) {
1823 		lkb->lkb_wait_type = 0;
1824 		goto out_del;
1825 	}
1826 
1827 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1828 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1829 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1830 	return -1;
1831 
1832  out_del:
1833 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1834 	   to the op that was in progress prior to the unlock/cancel; we
1835 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1836 	   this would happen */
1837 
1838 	if (overlap_done && lkb->lkb_wait_type) {
1839 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1840 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1841 		lkb->lkb_wait_count--;
1842 		unhold_lkb(lkb);
1843 		lkb->lkb_wait_type = 0;
1844 	}
1845 
1846 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1847 
1848 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1849 	lkb->lkb_wait_count--;
1850 	if (!lkb->lkb_wait_count)
1851 		list_del_init(&lkb->lkb_wait_reply);
1852 	unhold_lkb(lkb);
1853 	return 0;
1854 }
1855 
1856 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1857 {
1858 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1859 	int error;
1860 
1861 	spin_lock_bh(&ls->ls_waiters_lock);
1862 	error = _remove_from_waiters(lkb, mstype, NULL);
1863 	spin_unlock_bh(&ls->ls_waiters_lock);
1864 	return error;
1865 }
1866 
1867 /* Handles situations where we might be processing a "fake" or "local" reply in
1868  * the recovery context which stops any locking activity. Only debugfs might
1869  * change the lockspace waiters but they will held the recovery lock to ensure
1870  * remove_from_waiters_ms() in local case will be the only user manipulating the
1871  * lockspace waiters in recovery context.
1872  */
1873 
1874 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1875 				  const struct dlm_message *ms, bool local)
1876 {
1877 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1878 	int error;
1879 
1880 	if (!local)
1881 		spin_lock_bh(&ls->ls_waiters_lock);
1882 	else
1883 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1884 			     !dlm_locking_stopped(ls));
1885 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1886 	if (!local)
1887 		spin_unlock_bh(&ls->ls_waiters_lock);
1888 	return error;
1889 }
1890 
1891 /* lkb is master or local copy */
1892 
1893 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1894 {
1895 	int b, len = r->res_ls->ls_lvblen;
1896 
1897 	/* b=1 lvb returned to caller
1898 	   b=0 lvb written to rsb or invalidated
1899 	   b=-1 do nothing */
1900 
1901 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1902 
1903 	if (b == 1) {
1904 		if (!lkb->lkb_lvbptr)
1905 			return;
1906 
1907 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1908 			return;
1909 
1910 		if (!r->res_lvbptr)
1911 			return;
1912 
1913 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1914 		lkb->lkb_lvbseq = r->res_lvbseq;
1915 
1916 	} else if (b == 0) {
1917 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1918 			rsb_set_flag(r, RSB_VALNOTVALID);
1919 			return;
1920 		}
1921 
1922 		if (!lkb->lkb_lvbptr)
1923 			return;
1924 
1925 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1926 			return;
1927 
1928 		if (!r->res_lvbptr)
1929 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1930 
1931 		if (!r->res_lvbptr)
1932 			return;
1933 
1934 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1935 		r->res_lvbseq++;
1936 		lkb->lkb_lvbseq = r->res_lvbseq;
1937 		rsb_clear_flag(r, RSB_VALNOTVALID);
1938 	}
1939 
1940 	if (rsb_flag(r, RSB_VALNOTVALID))
1941 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1942 }
1943 
1944 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1945 {
1946 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1947 		return;
1948 
1949 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1950 		rsb_set_flag(r, RSB_VALNOTVALID);
1951 		return;
1952 	}
1953 
1954 	if (!lkb->lkb_lvbptr)
1955 		return;
1956 
1957 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1958 		return;
1959 
1960 	if (!r->res_lvbptr)
1961 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1962 
1963 	if (!r->res_lvbptr)
1964 		return;
1965 
1966 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1967 	r->res_lvbseq++;
1968 	rsb_clear_flag(r, RSB_VALNOTVALID);
1969 }
1970 
1971 /* lkb is process copy (pc) */
1972 
1973 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1974 			    const struct dlm_message *ms)
1975 {
1976 	int b;
1977 
1978 	if (!lkb->lkb_lvbptr)
1979 		return;
1980 
1981 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1982 		return;
1983 
1984 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1985 	if (b == 1) {
1986 		int len = receive_extralen(ms);
1987 		if (len > r->res_ls->ls_lvblen)
1988 			len = r->res_ls->ls_lvblen;
1989 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1990 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1991 	}
1992 }
1993 
1994 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1995    remove_lock -- used for unlock, removes lkb from granted
1996    revert_lock -- used for cancel, moves lkb from convert to granted
1997    grant_lock  -- used for request and convert, adds lkb to granted or
1998                   moves lkb from convert or waiting to granted
1999 
2000    Each of these is used for master or local copy lkb's.  There is
2001    also a _pc() variation used to make the corresponding change on
2002    a process copy (pc) lkb. */
2003 
2004 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005 {
2006 	del_lkb(r, lkb);
2007 	lkb->lkb_grmode = DLM_LOCK_IV;
2008 	/* this unhold undoes the original ref from create_lkb()
2009 	   so this leads to the lkb being freed */
2010 	unhold_lkb(lkb);
2011 }
2012 
2013 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2014 {
2015 	set_lvb_unlock(r, lkb);
2016 	_remove_lock(r, lkb);
2017 }
2018 
2019 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021 	_remove_lock(r, lkb);
2022 }
2023 
2024 /* returns: 0 did nothing
2025 	    1 moved lock to granted
2026 	   -1 removed lock */
2027 
2028 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2029 {
2030 	int rv = 0;
2031 
2032 	lkb->lkb_rqmode = DLM_LOCK_IV;
2033 
2034 	switch (lkb->lkb_status) {
2035 	case DLM_LKSTS_GRANTED:
2036 		break;
2037 	case DLM_LKSTS_CONVERT:
2038 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2039 		rv = 1;
2040 		break;
2041 	case DLM_LKSTS_WAITING:
2042 		del_lkb(r, lkb);
2043 		lkb->lkb_grmode = DLM_LOCK_IV;
2044 		/* this unhold undoes the original ref from create_lkb()
2045 		   so this leads to the lkb being freed */
2046 		unhold_lkb(lkb);
2047 		rv = -1;
2048 		break;
2049 	default:
2050 		log_print("invalid status for revert %d", lkb->lkb_status);
2051 	}
2052 	return rv;
2053 }
2054 
2055 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056 {
2057 	return revert_lock(r, lkb);
2058 }
2059 
2060 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061 {
2062 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2063 		lkb->lkb_grmode = lkb->lkb_rqmode;
2064 		if (lkb->lkb_status)
2065 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2066 		else
2067 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2068 	}
2069 
2070 	lkb->lkb_rqmode = DLM_LOCK_IV;
2071 	lkb->lkb_highbast = 0;
2072 }
2073 
2074 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075 {
2076 	set_lvb_lock(r, lkb);
2077 	_grant_lock(r, lkb);
2078 }
2079 
2080 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2081 			  const struct dlm_message *ms)
2082 {
2083 	set_lvb_lock_pc(r, lkb, ms);
2084 	_grant_lock(r, lkb);
2085 }
2086 
2087 /* called by grant_pending_locks() which means an async grant message must
2088    be sent to the requesting node in addition to granting the lock if the
2089    lkb belongs to a remote node. */
2090 
2091 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2092 {
2093 	grant_lock(r, lkb);
2094 	if (is_master_copy(lkb))
2095 		send_grant(r, lkb);
2096 	else
2097 		queue_cast(r, lkb, 0);
2098 }
2099 
2100 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2101    change the granted/requested modes.  We're munging things accordingly in
2102    the process copy.
2103    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2104    conversion deadlock
2105    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2106    compatible with other granted locks */
2107 
2108 static void munge_demoted(struct dlm_lkb *lkb)
2109 {
2110 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2111 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2112 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2113 		return;
2114 	}
2115 
2116 	lkb->lkb_grmode = DLM_LOCK_NL;
2117 }
2118 
2119 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2120 {
2121 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2122 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2123 		log_print("munge_altmode %x invalid reply type %d",
2124 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2125 		return;
2126 	}
2127 
2128 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2129 		lkb->lkb_rqmode = DLM_LOCK_PR;
2130 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2131 		lkb->lkb_rqmode = DLM_LOCK_CW;
2132 	else {
2133 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2134 		dlm_print_lkb(lkb);
2135 	}
2136 }
2137 
2138 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2139 {
2140 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2141 					   lkb_statequeue);
2142 	if (lkb->lkb_id == first->lkb_id)
2143 		return 1;
2144 
2145 	return 0;
2146 }
2147 
2148 /* Check if the given lkb conflicts with another lkb on the queue. */
2149 
2150 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2151 {
2152 	struct dlm_lkb *this;
2153 
2154 	list_for_each_entry(this, head, lkb_statequeue) {
2155 		if (this == lkb)
2156 			continue;
2157 		if (!modes_compat(this, lkb))
2158 			return 1;
2159 	}
2160 	return 0;
2161 }
2162 
2163 /*
2164  * "A conversion deadlock arises with a pair of lock requests in the converting
2165  * queue for one resource.  The granted mode of each lock blocks the requested
2166  * mode of the other lock."
2167  *
2168  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2169  * convert queue from being granted, then deadlk/demote lkb.
2170  *
2171  * Example:
2172  * Granted Queue: empty
2173  * Convert Queue: NL->EX (first lock)
2174  *                PR->EX (second lock)
2175  *
2176  * The first lock can't be granted because of the granted mode of the second
2177  * lock and the second lock can't be granted because it's not first in the
2178  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2179  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2180  * flag set and return DEMOTED in the lksb flags.
2181  *
2182  * Originally, this function detected conv-deadlk in a more limited scope:
2183  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2184  * - if lkb1 was the first entry in the queue (not just earlier), and was
2185  *   blocked by the granted mode of lkb2, and there was nothing on the
2186  *   granted queue preventing lkb1 from being granted immediately, i.e.
2187  *   lkb2 was the only thing preventing lkb1 from being granted.
2188  *
2189  * That second condition meant we'd only say there was conv-deadlk if
2190  * resolving it (by demotion) would lead to the first lock on the convert
2191  * queue being granted right away.  It allowed conversion deadlocks to exist
2192  * between locks on the convert queue while they couldn't be granted anyway.
2193  *
2194  * Now, we detect and take action on conversion deadlocks immediately when
2195  * they're created, even if they may not be immediately consequential.  If
2196  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2197  * mode that would prevent lkb1's conversion from being granted, we do a
2198  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2199  * I think this means that the lkb_is_ahead condition below should always
2200  * be zero, i.e. there will never be conv-deadlk between two locks that are
2201  * both already on the convert queue.
2202  */
2203 
2204 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2205 {
2206 	struct dlm_lkb *lkb1;
2207 	int lkb_is_ahead = 0;
2208 
2209 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2210 		if (lkb1 == lkb2) {
2211 			lkb_is_ahead = 1;
2212 			continue;
2213 		}
2214 
2215 		if (!lkb_is_ahead) {
2216 			if (!modes_compat(lkb2, lkb1))
2217 				return 1;
2218 		} else {
2219 			if (!modes_compat(lkb2, lkb1) &&
2220 			    !modes_compat(lkb1, lkb2))
2221 				return 1;
2222 		}
2223 	}
2224 	return 0;
2225 }
2226 
2227 /*
2228  * Return 1 if the lock can be granted, 0 otherwise.
2229  * Also detect and resolve conversion deadlocks.
2230  *
2231  * lkb is the lock to be granted
2232  *
2233  * now is 1 if the function is being called in the context of the
2234  * immediate request, it is 0 if called later, after the lock has been
2235  * queued.
2236  *
2237  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2238  * after recovery.
2239  *
2240  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2241  */
2242 
2243 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2244 			   int recover)
2245 {
2246 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2247 
2248 	/*
2249 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2250 	 * a new request for a NL mode lock being blocked.
2251 	 *
2252 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2253 	 * request, then it would be granted.  In essence, the use of this flag
2254 	 * tells the Lock Manager to expedite theis request by not considering
2255 	 * what may be in the CONVERTING or WAITING queues...  As of this
2256 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2257 	 * mode locks.  This flag is not valid for conversion requests.
2258 	 *
2259 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2260 	 * conversion or used with a non-NL requested mode.  We also know an
2261 	 * EXPEDITE request is always granted immediately, so now must always
2262 	 * be 1.  The full condition to grant an expedite request: (now &&
2263 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2264 	 * therefore be shortened to just checking the flag.
2265 	 */
2266 
2267 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2268 		return 1;
2269 
2270 	/*
2271 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2272 	 * added to the remaining conditions.
2273 	 */
2274 
2275 	if (queue_conflict(&r->res_grantqueue, lkb))
2276 		return 0;
2277 
2278 	/*
2279 	 * 6-3: By default, a conversion request is immediately granted if the
2280 	 * requested mode is compatible with the modes of all other granted
2281 	 * locks
2282 	 */
2283 
2284 	if (queue_conflict(&r->res_convertqueue, lkb))
2285 		return 0;
2286 
2287 	/*
2288 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2289 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2290 	 * The lkb's may have been rebuilt on the queues in a different
2291 	 * order than they were in on the previous master.  So, granting
2292 	 * queued conversions in order after recovery doesn't make sense
2293 	 * since the order hasn't been preserved anyway.  The new order
2294 	 * could also have created a new "in place" conversion deadlock.
2295 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2296 	 * After recovery, there would be no granted locks, and possibly
2297 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2298 	 * recovery, grant conversions without considering order.
2299 	 */
2300 
2301 	if (conv && recover)
2302 		return 1;
2303 
2304 	/*
2305 	 * 6-5: But the default algorithm for deciding whether to grant or
2306 	 * queue conversion requests does not by itself guarantee that such
2307 	 * requests are serviced on a "first come first serve" basis.  This, in
2308 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2309 	 *
2310 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2311 	 * the system service employed to request a lock conversion.  This flag
2312 	 * forces certain conversion requests to be queued, even if they are
2313 	 * compatible with the granted modes of other locks on the same
2314 	 * resource.  Thus, the use of this flag results in conversion requests
2315 	 * being ordered on a "first come first servce" basis.
2316 	 *
2317 	 * DCT: This condition is all about new conversions being able to occur
2318 	 * "in place" while the lock remains on the granted queue (assuming
2319 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2320 	 * doesn't _have_ to go onto the convert queue where it's processed in
2321 	 * order.  The "now" variable is necessary to distinguish converts
2322 	 * being received and processed for the first time now, because once a
2323 	 * convert is moved to the conversion queue the condition below applies
2324 	 * requiring fifo granting.
2325 	 */
2326 
2327 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2328 		return 1;
2329 
2330 	/*
2331 	 * Even if the convert is compat with all granted locks,
2332 	 * QUECVT forces it behind other locks on the convert queue.
2333 	 */
2334 
2335 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2336 		if (list_empty(&r->res_convertqueue))
2337 			return 1;
2338 		else
2339 			return 0;
2340 	}
2341 
2342 	/*
2343 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2344 	 * order.
2345 	 */
2346 
2347 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2348 		return 1;
2349 
2350 	/*
2351 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2352 	 * granted until all other conversion requests ahead of it are granted
2353 	 * and/or canceled.
2354 	 */
2355 
2356 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2357 		return 1;
2358 
2359 	/*
2360 	 * 6-4: By default, a new request is immediately granted only if all
2361 	 * three of the following conditions are satisfied when the request is
2362 	 * issued:
2363 	 * - The queue of ungranted conversion requests for the resource is
2364 	 *   empty.
2365 	 * - The queue of ungranted new requests for the resource is empty.
2366 	 * - The mode of the new request is compatible with the most
2367 	 *   restrictive mode of all granted locks on the resource.
2368 	 */
2369 
2370 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2371 	    list_empty(&r->res_waitqueue))
2372 		return 1;
2373 
2374 	/*
2375 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2376 	 * it cannot be granted until the queue of ungranted conversion
2377 	 * requests is empty, all ungranted new requests ahead of it are
2378 	 * granted and/or canceled, and it is compatible with the granted mode
2379 	 * of the most restrictive lock granted on the resource.
2380 	 */
2381 
2382 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2383 	    first_in_list(lkb, &r->res_waitqueue))
2384 		return 1;
2385 
2386 	return 0;
2387 }
2388 
2389 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2390 			  int recover, int *err)
2391 {
2392 	int rv;
2393 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2394 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2395 
2396 	if (err)
2397 		*err = 0;
2398 
2399 	rv = _can_be_granted(r, lkb, now, recover);
2400 	if (rv)
2401 		goto out;
2402 
2403 	/*
2404 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2405 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2406 	 * cancels one of the locks.
2407 	 */
2408 
2409 	if (is_convert && can_be_queued(lkb) &&
2410 	    conversion_deadlock_detect(r, lkb)) {
2411 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2412 			lkb->lkb_grmode = DLM_LOCK_NL;
2413 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2414 		} else if (err) {
2415 			*err = -EDEADLK;
2416 		} else {
2417 			log_print("can_be_granted deadlock %x now %d",
2418 				  lkb->lkb_id, now);
2419 			dlm_dump_rsb(r);
2420 		}
2421 		goto out;
2422 	}
2423 
2424 	/*
2425 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2426 	 * to grant a request in a mode other than the normal rqmode.  It's a
2427 	 * simple way to provide a big optimization to applications that can
2428 	 * use them.
2429 	 */
2430 
2431 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2432 		alt = DLM_LOCK_PR;
2433 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2434 		alt = DLM_LOCK_CW;
2435 
2436 	if (alt) {
2437 		lkb->lkb_rqmode = alt;
2438 		rv = _can_be_granted(r, lkb, now, 0);
2439 		if (rv)
2440 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2441 		else
2442 			lkb->lkb_rqmode = rqmode;
2443 	}
2444  out:
2445 	return rv;
2446 }
2447 
2448 /* Returns the highest requested mode of all blocked conversions; sets
2449    cw if there's a blocked conversion to DLM_LOCK_CW. */
2450 
2451 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2452 				 unsigned int *count)
2453 {
2454 	struct dlm_lkb *lkb, *s;
2455 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2456 	int hi, demoted, quit, grant_restart, demote_restart;
2457 	int deadlk;
2458 
2459 	quit = 0;
2460  restart:
2461 	grant_restart = 0;
2462 	demote_restart = 0;
2463 	hi = DLM_LOCK_IV;
2464 
2465 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2466 		demoted = is_demoted(lkb);
2467 		deadlk = 0;
2468 
2469 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2470 			grant_lock_pending(r, lkb);
2471 			grant_restart = 1;
2472 			if (count)
2473 				(*count)++;
2474 			continue;
2475 		}
2476 
2477 		if (!demoted && is_demoted(lkb)) {
2478 			log_print("WARN: pending demoted %x node %d %s",
2479 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2480 			demote_restart = 1;
2481 			continue;
2482 		}
2483 
2484 		if (deadlk) {
2485 			/*
2486 			 * If DLM_LKB_NODLKWT flag is set and conversion
2487 			 * deadlock is detected, we request blocking AST and
2488 			 * down (or cancel) conversion.
2489 			 */
2490 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2491 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2492 					queue_bast(r, lkb, lkb->lkb_rqmode);
2493 					lkb->lkb_highbast = lkb->lkb_rqmode;
2494 				}
2495 			} else {
2496 				log_print("WARN: pending deadlock %x node %d %s",
2497 					  lkb->lkb_id, lkb->lkb_nodeid,
2498 					  r->res_name);
2499 				dlm_dump_rsb(r);
2500 			}
2501 			continue;
2502 		}
2503 
2504 		hi = max_t(int, lkb->lkb_rqmode, hi);
2505 
2506 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2507 			*cw = 1;
2508 	}
2509 
2510 	if (grant_restart)
2511 		goto restart;
2512 	if (demote_restart && !quit) {
2513 		quit = 1;
2514 		goto restart;
2515 	}
2516 
2517 	return max_t(int, high, hi);
2518 }
2519 
2520 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2521 			      unsigned int *count)
2522 {
2523 	struct dlm_lkb *lkb, *s;
2524 
2525 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2526 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2527 			grant_lock_pending(r, lkb);
2528 			if (count)
2529 				(*count)++;
2530 		} else {
2531 			high = max_t(int, lkb->lkb_rqmode, high);
2532 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2533 				*cw = 1;
2534 		}
2535 	}
2536 
2537 	return high;
2538 }
2539 
2540 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2541    on either the convert or waiting queue.
2542    high is the largest rqmode of all locks blocked on the convert or
2543    waiting queue. */
2544 
2545 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2546 {
2547 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2548 		if (gr->lkb_highbast < DLM_LOCK_EX)
2549 			return 1;
2550 		return 0;
2551 	}
2552 
2553 	if (gr->lkb_highbast < high &&
2554 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2555 		return 1;
2556 	return 0;
2557 }
2558 
2559 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2560 {
2561 	struct dlm_lkb *lkb, *s;
2562 	int high = DLM_LOCK_IV;
2563 	int cw = 0;
2564 
2565 	if (!is_master(r)) {
2566 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2567 		dlm_dump_rsb(r);
2568 		return;
2569 	}
2570 
2571 	high = grant_pending_convert(r, high, &cw, count);
2572 	high = grant_pending_wait(r, high, &cw, count);
2573 
2574 	if (high == DLM_LOCK_IV)
2575 		return;
2576 
2577 	/*
2578 	 * If there are locks left on the wait/convert queue then send blocking
2579 	 * ASTs to granted locks based on the largest requested mode (high)
2580 	 * found above.
2581 	 */
2582 
2583 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2584 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2585 			if (cw && high == DLM_LOCK_PR &&
2586 			    lkb->lkb_grmode == DLM_LOCK_PR)
2587 				queue_bast(r, lkb, DLM_LOCK_CW);
2588 			else
2589 				queue_bast(r, lkb, high);
2590 			lkb->lkb_highbast = high;
2591 		}
2592 	}
2593 }
2594 
2595 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2596 {
2597 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2598 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2599 		if (gr->lkb_highbast < DLM_LOCK_EX)
2600 			return 1;
2601 		return 0;
2602 	}
2603 
2604 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2605 		return 1;
2606 	return 0;
2607 }
2608 
2609 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2610 			    struct dlm_lkb *lkb)
2611 {
2612 	struct dlm_lkb *gr;
2613 
2614 	list_for_each_entry(gr, head, lkb_statequeue) {
2615 		/* skip self when sending basts to convertqueue */
2616 		if (gr == lkb)
2617 			continue;
2618 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2619 			queue_bast(r, gr, lkb->lkb_rqmode);
2620 			gr->lkb_highbast = lkb->lkb_rqmode;
2621 		}
2622 	}
2623 }
2624 
2625 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2626 {
2627 	send_bast_queue(r, &r->res_grantqueue, lkb);
2628 }
2629 
2630 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631 {
2632 	send_bast_queue(r, &r->res_grantqueue, lkb);
2633 	send_bast_queue(r, &r->res_convertqueue, lkb);
2634 }
2635 
2636 /* set_master(r, lkb) -- set the master nodeid of a resource
2637 
2638    The purpose of this function is to set the nodeid field in the given
2639    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2640    known, it can just be copied to the lkb and the function will return
2641    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2642    before it can be copied to the lkb.
2643 
2644    When the rsb nodeid is being looked up remotely, the initial lkb
2645    causing the lookup is kept on the ls_waiters list waiting for the
2646    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2647    on the rsb's res_lookup list until the master is verified.
2648 
2649    Return values:
2650    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2651    1: the rsb master is not available and the lkb has been placed on
2652       a wait queue
2653 */
2654 
2655 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2656 {
2657 	int our_nodeid = dlm_our_nodeid();
2658 
2659 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2660 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2661 		r->res_first_lkid = lkb->lkb_id;
2662 		lkb->lkb_nodeid = r->res_nodeid;
2663 		return 0;
2664 	}
2665 
2666 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2667 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2668 		return 1;
2669 	}
2670 
2671 	if (r->res_master_nodeid == our_nodeid) {
2672 		lkb->lkb_nodeid = 0;
2673 		return 0;
2674 	}
2675 
2676 	if (r->res_master_nodeid) {
2677 		lkb->lkb_nodeid = r->res_master_nodeid;
2678 		return 0;
2679 	}
2680 
2681 	if (dlm_dir_nodeid(r) == our_nodeid) {
2682 		/* This is a somewhat unusual case; find_rsb will usually
2683 		   have set res_master_nodeid when dir nodeid is local, but
2684 		   there are cases where we become the dir node after we've
2685 		   past find_rsb and go through _request_lock again.
2686 		   confirm_master() or process_lookup_list() needs to be
2687 		   called after this. */
2688 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2689 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2690 			  r->res_name);
2691 		r->res_master_nodeid = our_nodeid;
2692 		r->res_nodeid = 0;
2693 		lkb->lkb_nodeid = 0;
2694 		return 0;
2695 	}
2696 
2697 	r->res_first_lkid = lkb->lkb_id;
2698 	send_lookup(r, lkb);
2699 	return 1;
2700 }
2701 
2702 static void process_lookup_list(struct dlm_rsb *r)
2703 {
2704 	struct dlm_lkb *lkb, *safe;
2705 
2706 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2707 		list_del_init(&lkb->lkb_rsb_lookup);
2708 		_request_lock(r, lkb);
2709 	}
2710 }
2711 
2712 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2713 
2714 static void confirm_master(struct dlm_rsb *r, int error)
2715 {
2716 	struct dlm_lkb *lkb;
2717 
2718 	if (!r->res_first_lkid)
2719 		return;
2720 
2721 	switch (error) {
2722 	case 0:
2723 	case -EINPROGRESS:
2724 		r->res_first_lkid = 0;
2725 		process_lookup_list(r);
2726 		break;
2727 
2728 	case -EAGAIN:
2729 	case -EBADR:
2730 	case -ENOTBLK:
2731 		/* the remote request failed and won't be retried (it was
2732 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2733 		   lkb the first_lkid */
2734 
2735 		r->res_first_lkid = 0;
2736 
2737 		if (!list_empty(&r->res_lookup)) {
2738 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2739 					 lkb_rsb_lookup);
2740 			list_del_init(&lkb->lkb_rsb_lookup);
2741 			r->res_first_lkid = lkb->lkb_id;
2742 			_request_lock(r, lkb);
2743 		}
2744 		break;
2745 
2746 	default:
2747 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2748 	}
2749 }
2750 
2751 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2752 			 int namelen, void (*ast)(void *astparam),
2753 			 void *astparam,
2754 			 void (*bast)(void *astparam, int mode),
2755 			 struct dlm_args *args)
2756 {
2757 	int rv = -EINVAL;
2758 
2759 	/* check for invalid arg usage */
2760 
2761 	if (mode < 0 || mode > DLM_LOCK_EX)
2762 		goto out;
2763 
2764 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2765 		goto out;
2766 
2767 	if (flags & DLM_LKF_CANCEL)
2768 		goto out;
2769 
2770 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2771 		goto out;
2772 
2773 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2774 		goto out;
2775 
2776 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2777 		goto out;
2778 
2779 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2780 		goto out;
2781 
2782 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2783 		goto out;
2784 
2785 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2786 		goto out;
2787 
2788 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2789 		goto out;
2790 
2791 	if (!ast || !lksb)
2792 		goto out;
2793 
2794 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2795 		goto out;
2796 
2797 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2798 		goto out;
2799 
2800 	/* these args will be copied to the lkb in validate_lock_args,
2801 	   it cannot be done now because when converting locks, fields in
2802 	   an active lkb cannot be modified before locking the rsb */
2803 
2804 	args->flags = flags;
2805 	args->astfn = ast;
2806 	args->astparam = astparam;
2807 	args->bastfn = bast;
2808 	args->mode = mode;
2809 	args->lksb = lksb;
2810 	rv = 0;
2811  out:
2812 	return rv;
2813 }
2814 
2815 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2816 {
2817 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2818  		      DLM_LKF_FORCEUNLOCK))
2819 		return -EINVAL;
2820 
2821 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2822 		return -EINVAL;
2823 
2824 	args->flags = flags;
2825 	args->astparam = astarg;
2826 	return 0;
2827 }
2828 
2829 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2830 			      struct dlm_args *args)
2831 {
2832 	int rv = -EBUSY;
2833 
2834 	if (args->flags & DLM_LKF_CONVERT) {
2835 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2836 			goto out;
2837 
2838 		/* lock not allowed if there's any op in progress */
2839 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2840 			goto out;
2841 
2842 		if (is_overlap(lkb))
2843 			goto out;
2844 
2845 		rv = -EINVAL;
2846 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2847 			goto out;
2848 
2849 		if (args->flags & DLM_LKF_QUECVT &&
2850 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2851 			goto out;
2852 	}
2853 
2854 	lkb->lkb_exflags = args->flags;
2855 	dlm_set_sbflags_val(lkb, 0);
2856 	lkb->lkb_astfn = args->astfn;
2857 	lkb->lkb_astparam = args->astparam;
2858 	lkb->lkb_bastfn = args->bastfn;
2859 	lkb->lkb_rqmode = args->mode;
2860 	lkb->lkb_lksb = args->lksb;
2861 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2862 	lkb->lkb_ownpid = (int) current->pid;
2863 	rv = 0;
2864  out:
2865 	switch (rv) {
2866 	case 0:
2867 		break;
2868 	case -EINVAL:
2869 		/* annoy the user because dlm usage is wrong */
2870 		WARN_ON(1);
2871 		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2872 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2873 			  lkb->lkb_status, lkb->lkb_wait_type);
2874 		break;
2875 	default:
2876 		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2877 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878 			  lkb->lkb_status, lkb->lkb_wait_type);
2879 		break;
2880 	}
2881 
2882 	return rv;
2883 }
2884 
2885 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2886    for success */
2887 
2888 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2889    because there may be a lookup in progress and it's valid to do
2890    cancel/unlockf on it */
2891 
2892 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2893 {
2894 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2895 	int rv = -EBUSY;
2896 
2897 	/* normal unlock not allowed if there's any op in progress */
2898 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2899 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2900 		goto out;
2901 
2902 	/* an lkb may be waiting for an rsb lookup to complete where the
2903 	   lookup was initiated by another lock */
2904 
2905 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2906 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2907 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2908 			list_del_init(&lkb->lkb_rsb_lookup);
2909 			queue_cast(lkb->lkb_resource, lkb,
2910 				   args->flags & DLM_LKF_CANCEL ?
2911 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2912 			unhold_lkb(lkb); /* undoes create_lkb() */
2913 		}
2914 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2915 		goto out;
2916 	}
2917 
2918 	rv = -EINVAL;
2919 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2920 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2921 		dlm_print_lkb(lkb);
2922 		goto out;
2923 	}
2924 
2925 	/* an lkb may still exist even though the lock is EOL'ed due to a
2926 	 * cancel, unlock or failed noqueue request; an app can't use these
2927 	 * locks; return same error as if the lkid had not been found at all
2928 	 */
2929 
2930 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2931 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2932 		rv = -ENOENT;
2933 		goto out;
2934 	}
2935 
2936 	if (is_overlap_unlock(lkb))
2937 		goto out;
2938 
2939 	/* cancel not allowed with another cancel/unlock in progress */
2940 
2941 	if (args->flags & DLM_LKF_CANCEL) {
2942 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2943 			goto out;
2944 
2945 		if (is_overlap_cancel(lkb))
2946 			goto out;
2947 
2948 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2949 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2950 			rv = -EBUSY;
2951 			goto out;
2952 		}
2953 
2954 		/* there's nothing to cancel */
2955 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2956 		    !lkb->lkb_wait_type) {
2957 			rv = -EBUSY;
2958 			goto out;
2959 		}
2960 
2961 		switch (lkb->lkb_wait_type) {
2962 		case DLM_MSG_LOOKUP:
2963 		case DLM_MSG_REQUEST:
2964 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2965 			rv = -EBUSY;
2966 			goto out;
2967 		case DLM_MSG_UNLOCK:
2968 		case DLM_MSG_CANCEL:
2969 			goto out;
2970 		}
2971 		/* add_to_waiters() will set OVERLAP_CANCEL */
2972 		goto out_ok;
2973 	}
2974 
2975 	/* do we need to allow a force-unlock if there's a normal unlock
2976 	   already in progress?  in what conditions could the normal unlock
2977 	   fail such that we'd want to send a force-unlock to be sure? */
2978 
2979 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2980 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2981 			goto out;
2982 
2983 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2984 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2985 			rv = -EBUSY;
2986 			goto out;
2987 		}
2988 
2989 		switch (lkb->lkb_wait_type) {
2990 		case DLM_MSG_LOOKUP:
2991 		case DLM_MSG_REQUEST:
2992 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2993 			rv = -EBUSY;
2994 			goto out;
2995 		case DLM_MSG_UNLOCK:
2996 			goto out;
2997 		}
2998 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2999 	}
3000 
3001  out_ok:
3002 	/* an overlapping op shouldn't blow away exflags from other op */
3003 	lkb->lkb_exflags |= args->flags;
3004 	dlm_set_sbflags_val(lkb, 0);
3005 	lkb->lkb_astparam = args->astparam;
3006 	rv = 0;
3007  out:
3008 	switch (rv) {
3009 	case 0:
3010 		break;
3011 	case -EINVAL:
3012 		/* annoy the user because dlm usage is wrong */
3013 		WARN_ON(1);
3014 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3015 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3016 			  args->flags, lkb->lkb_wait_type,
3017 			  lkb->lkb_resource->res_name);
3018 		break;
3019 	default:
3020 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3021 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3022 			  args->flags, lkb->lkb_wait_type,
3023 			  lkb->lkb_resource->res_name);
3024 		break;
3025 	}
3026 
3027 	return rv;
3028 }
3029 
3030 /*
3031  * Four stage 4 varieties:
3032  * do_request(), do_convert(), do_unlock(), do_cancel()
3033  * These are called on the master node for the given lock and
3034  * from the central locking logic.
3035  */
3036 
3037 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3038 {
3039 	int error = 0;
3040 
3041 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3042 		grant_lock(r, lkb);
3043 		queue_cast(r, lkb, 0);
3044 		goto out;
3045 	}
3046 
3047 	if (can_be_queued(lkb)) {
3048 		error = -EINPROGRESS;
3049 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3050 		goto out;
3051 	}
3052 
3053 	error = -EAGAIN;
3054 	queue_cast(r, lkb, -EAGAIN);
3055  out:
3056 	return error;
3057 }
3058 
3059 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3060 			       int error)
3061 {
3062 	switch (error) {
3063 	case -EAGAIN:
3064 		if (force_blocking_asts(lkb))
3065 			send_blocking_asts_all(r, lkb);
3066 		break;
3067 	case -EINPROGRESS:
3068 		send_blocking_asts(r, lkb);
3069 		break;
3070 	}
3071 }
3072 
3073 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074 {
3075 	int error = 0;
3076 	int deadlk = 0;
3077 
3078 	/* changing an existing lock may allow others to be granted */
3079 
3080 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3081 		grant_lock(r, lkb);
3082 		queue_cast(r, lkb, 0);
3083 		goto out;
3084 	}
3085 
3086 	/* can_be_granted() detected that this lock would block in a conversion
3087 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3088 	   the ast for the convert. */
3089 
3090 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3091 		/* it's left on the granted queue */
3092 		revert_lock(r, lkb);
3093 		queue_cast(r, lkb, -EDEADLK);
3094 		error = -EDEADLK;
3095 		goto out;
3096 	}
3097 
3098 	/* is_demoted() means the can_be_granted() above set the grmode
3099 	   to NL, and left us on the granted queue.  This auto-demotion
3100 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3101 	   now grantable.  We have to try to grant other converting locks
3102 	   before we try again to grant this one. */
3103 
3104 	if (is_demoted(lkb)) {
3105 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3106 		if (_can_be_granted(r, lkb, 1, 0)) {
3107 			grant_lock(r, lkb);
3108 			queue_cast(r, lkb, 0);
3109 			goto out;
3110 		}
3111 		/* else fall through and move to convert queue */
3112 	}
3113 
3114 	if (can_be_queued(lkb)) {
3115 		error = -EINPROGRESS;
3116 		del_lkb(r, lkb);
3117 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3118 		goto out;
3119 	}
3120 
3121 	error = -EAGAIN;
3122 	queue_cast(r, lkb, -EAGAIN);
3123  out:
3124 	return error;
3125 }
3126 
3127 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3128 			       int error)
3129 {
3130 	switch (error) {
3131 	case 0:
3132 		grant_pending_locks(r, NULL);
3133 		/* grant_pending_locks also sends basts */
3134 		break;
3135 	case -EAGAIN:
3136 		if (force_blocking_asts(lkb))
3137 			send_blocking_asts_all(r, lkb);
3138 		break;
3139 	case -EINPROGRESS:
3140 		send_blocking_asts(r, lkb);
3141 		break;
3142 	}
3143 }
3144 
3145 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3146 {
3147 	remove_lock(r, lkb);
3148 	queue_cast(r, lkb, -DLM_EUNLOCK);
3149 	return -DLM_EUNLOCK;
3150 }
3151 
3152 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3153 			      int error)
3154 {
3155 	grant_pending_locks(r, NULL);
3156 }
3157 
3158 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3159 
3160 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3161 {
3162 	int error;
3163 
3164 	error = revert_lock(r, lkb);
3165 	if (error) {
3166 		queue_cast(r, lkb, -DLM_ECANCEL);
3167 		return -DLM_ECANCEL;
3168 	}
3169 	return 0;
3170 }
3171 
3172 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3173 			      int error)
3174 {
3175 	if (error)
3176 		grant_pending_locks(r, NULL);
3177 }
3178 
3179 /*
3180  * Four stage 3 varieties:
3181  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3182  */
3183 
3184 /* add a new lkb to a possibly new rsb, called by requesting process */
3185 
3186 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3187 {
3188 	int error;
3189 
3190 	/* set_master: sets lkb nodeid from r */
3191 
3192 	error = set_master(r, lkb);
3193 	if (error < 0)
3194 		goto out;
3195 	if (error) {
3196 		error = 0;
3197 		goto out;
3198 	}
3199 
3200 	if (is_remote(r)) {
3201 		/* receive_request() calls do_request() on remote node */
3202 		error = send_request(r, lkb);
3203 	} else {
3204 		error = do_request(r, lkb);
3205 		/* for remote locks the request_reply is sent
3206 		   between do_request and do_request_effects */
3207 		do_request_effects(r, lkb, error);
3208 	}
3209  out:
3210 	return error;
3211 }
3212 
3213 /* change some property of an existing lkb, e.g. mode */
3214 
3215 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3216 {
3217 	int error;
3218 
3219 	if (is_remote(r)) {
3220 		/* receive_convert() calls do_convert() on remote node */
3221 		error = send_convert(r, lkb);
3222 	} else {
3223 		error = do_convert(r, lkb);
3224 		/* for remote locks the convert_reply is sent
3225 		   between do_convert and do_convert_effects */
3226 		do_convert_effects(r, lkb, error);
3227 	}
3228 
3229 	return error;
3230 }
3231 
3232 /* remove an existing lkb from the granted queue */
3233 
3234 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235 {
3236 	int error;
3237 
3238 	if (is_remote(r)) {
3239 		/* receive_unlock() calls do_unlock() on remote node */
3240 		error = send_unlock(r, lkb);
3241 	} else {
3242 		error = do_unlock(r, lkb);
3243 		/* for remote locks the unlock_reply is sent
3244 		   between do_unlock and do_unlock_effects */
3245 		do_unlock_effects(r, lkb, error);
3246 	}
3247 
3248 	return error;
3249 }
3250 
3251 /* remove an existing lkb from the convert or wait queue */
3252 
3253 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254 {
3255 	int error;
3256 
3257 	if (is_remote(r)) {
3258 		/* receive_cancel() calls do_cancel() on remote node */
3259 		error = send_cancel(r, lkb);
3260 	} else {
3261 		error = do_cancel(r, lkb);
3262 		/* for remote locks the cancel_reply is sent
3263 		   between do_cancel and do_cancel_effects */
3264 		do_cancel_effects(r, lkb, error);
3265 	}
3266 
3267 	return error;
3268 }
3269 
3270 /*
3271  * Four stage 2 varieties:
3272  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3273  */
3274 
3275 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3276 			const void *name, int len,
3277 			struct dlm_args *args)
3278 {
3279 	struct dlm_rsb *r;
3280 	int error;
3281 
3282 	error = validate_lock_args(ls, lkb, args);
3283 	if (error)
3284 		return error;
3285 
3286 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3287 	if (error)
3288 		return error;
3289 
3290 	lock_rsb(r);
3291 
3292 	attach_lkb(r, lkb);
3293 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3294 
3295 	error = _request_lock(r, lkb);
3296 
3297 	unlock_rsb(r);
3298 	put_rsb(r);
3299 	return error;
3300 }
3301 
3302 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3303 			struct dlm_args *args)
3304 {
3305 	struct dlm_rsb *r;
3306 	int error;
3307 
3308 	r = lkb->lkb_resource;
3309 
3310 	hold_rsb(r);
3311 	lock_rsb(r);
3312 
3313 	error = validate_lock_args(ls, lkb, args);
3314 	if (error)
3315 		goto out;
3316 
3317 	error = _convert_lock(r, lkb);
3318  out:
3319 	unlock_rsb(r);
3320 	put_rsb(r);
3321 	return error;
3322 }
3323 
3324 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3325 		       struct dlm_args *args)
3326 {
3327 	struct dlm_rsb *r;
3328 	int error;
3329 
3330 	r = lkb->lkb_resource;
3331 
3332 	hold_rsb(r);
3333 	lock_rsb(r);
3334 
3335 	error = validate_unlock_args(lkb, args);
3336 	if (error)
3337 		goto out;
3338 
3339 	error = _unlock_lock(r, lkb);
3340  out:
3341 	unlock_rsb(r);
3342 	put_rsb(r);
3343 	return error;
3344 }
3345 
3346 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3347 		       struct dlm_args *args)
3348 {
3349 	struct dlm_rsb *r;
3350 	int error;
3351 
3352 	r = lkb->lkb_resource;
3353 
3354 	hold_rsb(r);
3355 	lock_rsb(r);
3356 
3357 	error = validate_unlock_args(lkb, args);
3358 	if (error)
3359 		goto out;
3360 
3361 	error = _cancel_lock(r, lkb);
3362  out:
3363 	unlock_rsb(r);
3364 	put_rsb(r);
3365 	return error;
3366 }
3367 
3368 /*
3369  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3370  */
3371 
3372 int dlm_lock(dlm_lockspace_t *lockspace,
3373 	     int mode,
3374 	     struct dlm_lksb *lksb,
3375 	     uint32_t flags,
3376 	     const void *name,
3377 	     unsigned int namelen,
3378 	     uint32_t parent_lkid,
3379 	     void (*ast) (void *astarg),
3380 	     void *astarg,
3381 	     void (*bast) (void *astarg, int mode))
3382 {
3383 	struct dlm_ls *ls;
3384 	struct dlm_lkb *lkb;
3385 	struct dlm_args args;
3386 	int error, convert = flags & DLM_LKF_CONVERT;
3387 
3388 	ls = dlm_find_lockspace_local(lockspace);
3389 	if (!ls)
3390 		return -EINVAL;
3391 
3392 	dlm_lock_recovery(ls);
3393 
3394 	if (convert)
3395 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3396 	else
3397 		error = create_lkb(ls, &lkb);
3398 
3399 	if (error)
3400 		goto out;
3401 
3402 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3403 
3404 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3405 			      &args);
3406 	if (error)
3407 		goto out_put;
3408 
3409 	if (convert)
3410 		error = convert_lock(ls, lkb, &args);
3411 	else
3412 		error = request_lock(ls, lkb, name, namelen, &args);
3413 
3414 	if (error == -EINPROGRESS)
3415 		error = 0;
3416  out_put:
3417 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3418 
3419 	if (convert || error)
3420 		__put_lkb(ls, lkb);
3421 	if (error == -EAGAIN || error == -EDEADLK)
3422 		error = 0;
3423  out:
3424 	dlm_unlock_recovery(ls);
3425 	dlm_put_lockspace(ls);
3426 	return error;
3427 }
3428 
3429 int dlm_unlock(dlm_lockspace_t *lockspace,
3430 	       uint32_t lkid,
3431 	       uint32_t flags,
3432 	       struct dlm_lksb *lksb,
3433 	       void *astarg)
3434 {
3435 	struct dlm_ls *ls;
3436 	struct dlm_lkb *lkb;
3437 	struct dlm_args args;
3438 	int error;
3439 
3440 	ls = dlm_find_lockspace_local(lockspace);
3441 	if (!ls)
3442 		return -EINVAL;
3443 
3444 	dlm_lock_recovery(ls);
3445 
3446 	error = find_lkb(ls, lkid, &lkb);
3447 	if (error)
3448 		goto out;
3449 
3450 	trace_dlm_unlock_start(ls, lkb, flags);
3451 
3452 	error = set_unlock_args(flags, astarg, &args);
3453 	if (error)
3454 		goto out_put;
3455 
3456 	if (flags & DLM_LKF_CANCEL)
3457 		error = cancel_lock(ls, lkb, &args);
3458 	else
3459 		error = unlock_lock(ls, lkb, &args);
3460 
3461 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3462 		error = 0;
3463 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3464 		error = 0;
3465  out_put:
3466 	trace_dlm_unlock_end(ls, lkb, flags, error);
3467 
3468 	dlm_put_lkb(lkb);
3469  out:
3470 	dlm_unlock_recovery(ls);
3471 	dlm_put_lockspace(ls);
3472 	return error;
3473 }
3474 
3475 /*
3476  * send/receive routines for remote operations and replies
3477  *
3478  * send_args
3479  * send_common
3480  * send_request			receive_request
3481  * send_convert			receive_convert
3482  * send_unlock			receive_unlock
3483  * send_cancel			receive_cancel
3484  * send_grant			receive_grant
3485  * send_bast			receive_bast
3486  * send_lookup			receive_lookup
3487  * send_remove			receive_remove
3488  *
3489  * 				send_common_reply
3490  * receive_request_reply	send_request_reply
3491  * receive_convert_reply	send_convert_reply
3492  * receive_unlock_reply		send_unlock_reply
3493  * receive_cancel_reply		send_cancel_reply
3494  * receive_lookup_reply		send_lookup_reply
3495  */
3496 
3497 static int _create_message(struct dlm_ls *ls, int mb_len,
3498 			   int to_nodeid, int mstype,
3499 			   struct dlm_message **ms_ret,
3500 			   struct dlm_mhandle **mh_ret)
3501 {
3502 	struct dlm_message *ms;
3503 	struct dlm_mhandle *mh;
3504 	char *mb;
3505 
3506 	/* get_buffer gives us a message handle (mh) that we need to
3507 	   pass into midcomms_commit and a message buffer (mb) that we
3508 	   write our data into */
3509 
3510 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3511 	if (!mh)
3512 		return -ENOBUFS;
3513 
3514 	ms = (struct dlm_message *) mb;
3515 
3516 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3517 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3518 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3519 	ms->m_header.h_length = cpu_to_le16(mb_len);
3520 	ms->m_header.h_cmd = DLM_MSG;
3521 
3522 	ms->m_type = cpu_to_le32(mstype);
3523 
3524 	*mh_ret = mh;
3525 	*ms_ret = ms;
3526 	return 0;
3527 }
3528 
3529 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3530 			  int to_nodeid, int mstype,
3531 			  struct dlm_message **ms_ret,
3532 			  struct dlm_mhandle **mh_ret)
3533 {
3534 	int mb_len = sizeof(struct dlm_message);
3535 
3536 	switch (mstype) {
3537 	case DLM_MSG_REQUEST:
3538 	case DLM_MSG_LOOKUP:
3539 	case DLM_MSG_REMOVE:
3540 		mb_len += r->res_length;
3541 		break;
3542 	case DLM_MSG_CONVERT:
3543 	case DLM_MSG_UNLOCK:
3544 	case DLM_MSG_REQUEST_REPLY:
3545 	case DLM_MSG_CONVERT_REPLY:
3546 	case DLM_MSG_GRANT:
3547 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3548 			mb_len += r->res_ls->ls_lvblen;
3549 		break;
3550 	}
3551 
3552 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3553 			       ms_ret, mh_ret);
3554 }
3555 
3556 /* further lowcomms enhancements or alternate implementations may make
3557    the return value from this function useful at some point */
3558 
3559 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3560 			const void *name, int namelen)
3561 {
3562 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3563 	return 0;
3564 }
3565 
3566 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3567 		      struct dlm_message *ms)
3568 {
3569 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3570 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3571 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3572 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3573 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3574 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3575 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3576 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3577 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3578 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3579 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3580 	ms->m_hash     = cpu_to_le32(r->res_hash);
3581 
3582 	/* m_result and m_bastmode are set from function args,
3583 	   not from lkb fields */
3584 
3585 	if (lkb->lkb_bastfn)
3586 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3587 	if (lkb->lkb_astfn)
3588 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3589 
3590 	/* compare with switch in create_message; send_remove() doesn't
3591 	   use send_args() */
3592 
3593 	switch (ms->m_type) {
3594 	case cpu_to_le32(DLM_MSG_REQUEST):
3595 	case cpu_to_le32(DLM_MSG_LOOKUP):
3596 		memcpy(ms->m_extra, r->res_name, r->res_length);
3597 		break;
3598 	case cpu_to_le32(DLM_MSG_CONVERT):
3599 	case cpu_to_le32(DLM_MSG_UNLOCK):
3600 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3601 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3602 	case cpu_to_le32(DLM_MSG_GRANT):
3603 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3604 			break;
3605 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3606 		break;
3607 	}
3608 }
3609 
3610 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3611 {
3612 	struct dlm_message *ms;
3613 	struct dlm_mhandle *mh;
3614 	int to_nodeid, error;
3615 
3616 	to_nodeid = r->res_nodeid;
3617 
3618 	add_to_waiters(lkb, mstype, to_nodeid);
3619 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3620 	if (error)
3621 		goto fail;
3622 
3623 	send_args(r, lkb, ms);
3624 
3625 	error = send_message(mh, ms, r->res_name, r->res_length);
3626 	if (error)
3627 		goto fail;
3628 	return 0;
3629 
3630  fail:
3631 	remove_from_waiters(lkb, msg_reply_type(mstype));
3632 	return error;
3633 }
3634 
3635 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3636 {
3637 	return send_common(r, lkb, DLM_MSG_REQUEST);
3638 }
3639 
3640 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3641 {
3642 	int error;
3643 
3644 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3645 
3646 	/* down conversions go without a reply from the master */
3647 	if (!error && down_conversion(lkb)) {
3648 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3649 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3650 		r->res_ls->ls_local_ms.m_result = 0;
3651 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3652 	}
3653 
3654 	return error;
3655 }
3656 
3657 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3658    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3659    that the master is still correct. */
3660 
3661 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3662 {
3663 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3664 }
3665 
3666 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3667 {
3668 	return send_common(r, lkb, DLM_MSG_CANCEL);
3669 }
3670 
3671 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673 	struct dlm_message *ms;
3674 	struct dlm_mhandle *mh;
3675 	int to_nodeid, error;
3676 
3677 	to_nodeid = lkb->lkb_nodeid;
3678 
3679 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3680 	if (error)
3681 		goto out;
3682 
3683 	send_args(r, lkb, ms);
3684 
3685 	ms->m_result = 0;
3686 
3687 	error = send_message(mh, ms, r->res_name, r->res_length);
3688  out:
3689 	return error;
3690 }
3691 
3692 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3693 {
3694 	struct dlm_message *ms;
3695 	struct dlm_mhandle *mh;
3696 	int to_nodeid, error;
3697 
3698 	to_nodeid = lkb->lkb_nodeid;
3699 
3700 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3701 	if (error)
3702 		goto out;
3703 
3704 	send_args(r, lkb, ms);
3705 
3706 	ms->m_bastmode = cpu_to_le32(mode);
3707 
3708 	error = send_message(mh, ms, r->res_name, r->res_length);
3709  out:
3710 	return error;
3711 }
3712 
3713 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3714 {
3715 	struct dlm_message *ms;
3716 	struct dlm_mhandle *mh;
3717 	int to_nodeid, error;
3718 
3719 	to_nodeid = dlm_dir_nodeid(r);
3720 
3721 	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3722 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3723 	if (error)
3724 		goto fail;
3725 
3726 	send_args(r, lkb, ms);
3727 
3728 	error = send_message(mh, ms, r->res_name, r->res_length);
3729 	if (error)
3730 		goto fail;
3731 	return 0;
3732 
3733  fail:
3734 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3735 	return error;
3736 }
3737 
3738 static int send_remove(struct dlm_rsb *r)
3739 {
3740 	struct dlm_message *ms;
3741 	struct dlm_mhandle *mh;
3742 	int to_nodeid, error;
3743 
3744 	to_nodeid = dlm_dir_nodeid(r);
3745 
3746 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3747 	if (error)
3748 		goto out;
3749 
3750 	memcpy(ms->m_extra, r->res_name, r->res_length);
3751 	ms->m_hash = cpu_to_le32(r->res_hash);
3752 
3753 	error = send_message(mh, ms, r->res_name, r->res_length);
3754  out:
3755 	return error;
3756 }
3757 
3758 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3759 			     int mstype, int rv)
3760 {
3761 	struct dlm_message *ms;
3762 	struct dlm_mhandle *mh;
3763 	int to_nodeid, error;
3764 
3765 	to_nodeid = lkb->lkb_nodeid;
3766 
3767 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3768 	if (error)
3769 		goto out;
3770 
3771 	send_args(r, lkb, ms);
3772 
3773 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3774 
3775 	error = send_message(mh, ms, r->res_name, r->res_length);
3776  out:
3777 	return error;
3778 }
3779 
3780 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3781 {
3782 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3783 }
3784 
3785 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3786 {
3787 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3788 }
3789 
3790 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3791 {
3792 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3793 }
3794 
3795 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3796 {
3797 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3798 }
3799 
3800 static int send_lookup_reply(struct dlm_ls *ls,
3801 			     const struct dlm_message *ms_in, int ret_nodeid,
3802 			     int rv)
3803 {
3804 	struct dlm_rsb *r = &ls->ls_local_rsb;
3805 	struct dlm_message *ms;
3806 	struct dlm_mhandle *mh;
3807 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3808 
3809 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3810 	if (error)
3811 		goto out;
3812 
3813 	ms->m_lkid = ms_in->m_lkid;
3814 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3815 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3816 
3817 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3818  out:
3819 	return error;
3820 }
3821 
3822 /* which args we save from a received message depends heavily on the type
3823    of message, unlike the send side where we can safely send everything about
3824    the lkb for any type of message */
3825 
3826 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3827 {
3828 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3829 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3830 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3831 }
3832 
3833 static void receive_flags_reply(struct dlm_lkb *lkb,
3834 				const struct dlm_message *ms,
3835 				bool local)
3836 {
3837 	if (local)
3838 		return;
3839 
3840 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3841 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3842 }
3843 
3844 static int receive_extralen(const struct dlm_message *ms)
3845 {
3846 	return (le16_to_cpu(ms->m_header.h_length) -
3847 		sizeof(struct dlm_message));
3848 }
3849 
3850 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3851 		       const struct dlm_message *ms)
3852 {
3853 	int len;
3854 
3855 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3856 		if (!lkb->lkb_lvbptr)
3857 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3858 		if (!lkb->lkb_lvbptr)
3859 			return -ENOMEM;
3860 		len = receive_extralen(ms);
3861 		if (len > ls->ls_lvblen)
3862 			len = ls->ls_lvblen;
3863 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3864 	}
3865 	return 0;
3866 }
3867 
3868 static void fake_bastfn(void *astparam, int mode)
3869 {
3870 	log_print("fake_bastfn should not be called");
3871 }
3872 
3873 static void fake_astfn(void *astparam)
3874 {
3875 	log_print("fake_astfn should not be called");
3876 }
3877 
3878 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3879 				const struct dlm_message *ms)
3880 {
3881 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3882 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3883 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3884 	lkb->lkb_grmode = DLM_LOCK_IV;
3885 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3886 
3887 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3888 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3889 
3890 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3891 		/* lkb was just created so there won't be an lvb yet */
3892 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3893 		if (!lkb->lkb_lvbptr)
3894 			return -ENOMEM;
3895 	}
3896 
3897 	return 0;
3898 }
3899 
3900 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3901 				const struct dlm_message *ms)
3902 {
3903 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3904 		return -EBUSY;
3905 
3906 	if (receive_lvb(ls, lkb, ms))
3907 		return -ENOMEM;
3908 
3909 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3910 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3911 
3912 	return 0;
3913 }
3914 
3915 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3916 			       const struct dlm_message *ms)
3917 {
3918 	if (receive_lvb(ls, lkb, ms))
3919 		return -ENOMEM;
3920 	return 0;
3921 }
3922 
3923 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3924    uses to send a reply and that the remote end uses to process the reply. */
3925 
3926 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3927 {
3928 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3929 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3930 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3931 }
3932 
3933 /* This is called after the rsb is locked so that we can safely inspect
3934    fields in the lkb. */
3935 
3936 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3937 {
3938 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3939 	int error = 0;
3940 
3941 	/* currently mixing of user/kernel locks are not supported */
3942 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3943 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3944 		log_error(lkb->lkb_resource->res_ls,
3945 			  "got user dlm message for a kernel lock");
3946 		error = -EINVAL;
3947 		goto out;
3948 	}
3949 
3950 	switch (ms->m_type) {
3951 	case cpu_to_le32(DLM_MSG_CONVERT):
3952 	case cpu_to_le32(DLM_MSG_UNLOCK):
3953 	case cpu_to_le32(DLM_MSG_CANCEL):
3954 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3955 			error = -EINVAL;
3956 		break;
3957 
3958 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3959 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3960 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3961 	case cpu_to_le32(DLM_MSG_GRANT):
3962 	case cpu_to_le32(DLM_MSG_BAST):
3963 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3964 			error = -EINVAL;
3965 		break;
3966 
3967 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3968 		if (!is_process_copy(lkb))
3969 			error = -EINVAL;
3970 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3971 			error = -EINVAL;
3972 		break;
3973 
3974 	default:
3975 		error = -EINVAL;
3976 	}
3977 
3978 out:
3979 	if (error)
3980 		log_error(lkb->lkb_resource->res_ls,
3981 			  "ignore invalid message %d from %d %x %x %x %d",
3982 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3983 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3984 			  lkb->lkb_nodeid);
3985 	return error;
3986 }
3987 
3988 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3989 {
3990 	struct dlm_lkb *lkb;
3991 	struct dlm_rsb *r;
3992 	int from_nodeid;
3993 	int error, namelen = 0;
3994 
3995 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3996 
3997 	error = create_lkb(ls, &lkb);
3998 	if (error)
3999 		goto fail;
4000 
4001 	receive_flags(lkb, ms);
4002 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4003 	error = receive_request_args(ls, lkb, ms);
4004 	if (error) {
4005 		__put_lkb(ls, lkb);
4006 		goto fail;
4007 	}
4008 
4009 	/* The dir node is the authority on whether we are the master
4010 	   for this rsb or not, so if the master sends us a request, we should
4011 	   recreate the rsb if we've destroyed it.   This race happens when we
4012 	   send a remove message to the dir node at the same time that the dir
4013 	   node sends us a request for the rsb. */
4014 
4015 	namelen = receive_extralen(ms);
4016 
4017 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4018 			 R_RECEIVE_REQUEST, &r);
4019 	if (error) {
4020 		__put_lkb(ls, lkb);
4021 		goto fail;
4022 	}
4023 
4024 	lock_rsb(r);
4025 
4026 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4027 		error = validate_master_nodeid(ls, r, from_nodeid);
4028 		if (error) {
4029 			unlock_rsb(r);
4030 			put_rsb(r);
4031 			__put_lkb(ls, lkb);
4032 			goto fail;
4033 		}
4034 	}
4035 
4036 	attach_lkb(r, lkb);
4037 	error = do_request(r, lkb);
4038 	send_request_reply(r, lkb, error);
4039 	do_request_effects(r, lkb, error);
4040 
4041 	unlock_rsb(r);
4042 	put_rsb(r);
4043 
4044 	if (error == -EINPROGRESS)
4045 		error = 0;
4046 	if (error)
4047 		dlm_put_lkb(lkb);
4048 	return 0;
4049 
4050  fail:
4051 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4052 	   and do this receive_request again from process_lookup_list once
4053 	   we get the lookup reply.  This would avoid a many repeated
4054 	   ENOTBLK request failures when the lookup reply designating us
4055 	   as master is delayed. */
4056 
4057 	if (error != -ENOTBLK) {
4058 		log_limit(ls, "receive_request %x from %d %d",
4059 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4060 	}
4061 
4062 	setup_local_lkb(ls, ms);
4063 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4064 	return error;
4065 }
4066 
4067 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4068 {
4069 	struct dlm_lkb *lkb;
4070 	struct dlm_rsb *r;
4071 	int error, reply = 1;
4072 
4073 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4074 	if (error)
4075 		goto fail;
4076 
4077 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4078 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4079 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4080 			  (unsigned long long)lkb->lkb_recover_seq,
4081 			  le32_to_cpu(ms->m_header.h_nodeid),
4082 			  le32_to_cpu(ms->m_lkid));
4083 		error = -ENOENT;
4084 		dlm_put_lkb(lkb);
4085 		goto fail;
4086 	}
4087 
4088 	r = lkb->lkb_resource;
4089 
4090 	hold_rsb(r);
4091 	lock_rsb(r);
4092 
4093 	error = validate_message(lkb, ms);
4094 	if (error)
4095 		goto out;
4096 
4097 	receive_flags(lkb, ms);
4098 
4099 	error = receive_convert_args(ls, lkb, ms);
4100 	if (error) {
4101 		send_convert_reply(r, lkb, error);
4102 		goto out;
4103 	}
4104 
4105 	reply = !down_conversion(lkb);
4106 
4107 	error = do_convert(r, lkb);
4108 	if (reply)
4109 		send_convert_reply(r, lkb, error);
4110 	do_convert_effects(r, lkb, error);
4111  out:
4112 	unlock_rsb(r);
4113 	put_rsb(r);
4114 	dlm_put_lkb(lkb);
4115 	return 0;
4116 
4117  fail:
4118 	setup_local_lkb(ls, ms);
4119 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4120 	return error;
4121 }
4122 
4123 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4124 {
4125 	struct dlm_lkb *lkb;
4126 	struct dlm_rsb *r;
4127 	int error;
4128 
4129 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4130 	if (error)
4131 		goto fail;
4132 
4133 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4134 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4135 			  lkb->lkb_id, lkb->lkb_remid,
4136 			  le32_to_cpu(ms->m_header.h_nodeid),
4137 			  le32_to_cpu(ms->m_lkid));
4138 		error = -ENOENT;
4139 		dlm_put_lkb(lkb);
4140 		goto fail;
4141 	}
4142 
4143 	r = lkb->lkb_resource;
4144 
4145 	hold_rsb(r);
4146 	lock_rsb(r);
4147 
4148 	error = validate_message(lkb, ms);
4149 	if (error)
4150 		goto out;
4151 
4152 	receive_flags(lkb, ms);
4153 
4154 	error = receive_unlock_args(ls, lkb, ms);
4155 	if (error) {
4156 		send_unlock_reply(r, lkb, error);
4157 		goto out;
4158 	}
4159 
4160 	error = do_unlock(r, lkb);
4161 	send_unlock_reply(r, lkb, error);
4162 	do_unlock_effects(r, lkb, error);
4163  out:
4164 	unlock_rsb(r);
4165 	put_rsb(r);
4166 	dlm_put_lkb(lkb);
4167 	return 0;
4168 
4169  fail:
4170 	setup_local_lkb(ls, ms);
4171 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4172 	return error;
4173 }
4174 
4175 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4176 {
4177 	struct dlm_lkb *lkb;
4178 	struct dlm_rsb *r;
4179 	int error;
4180 
4181 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4182 	if (error)
4183 		goto fail;
4184 
4185 	receive_flags(lkb, ms);
4186 
4187 	r = lkb->lkb_resource;
4188 
4189 	hold_rsb(r);
4190 	lock_rsb(r);
4191 
4192 	error = validate_message(lkb, ms);
4193 	if (error)
4194 		goto out;
4195 
4196 	error = do_cancel(r, lkb);
4197 	send_cancel_reply(r, lkb, error);
4198 	do_cancel_effects(r, lkb, error);
4199  out:
4200 	unlock_rsb(r);
4201 	put_rsb(r);
4202 	dlm_put_lkb(lkb);
4203 	return 0;
4204 
4205  fail:
4206 	setup_local_lkb(ls, ms);
4207 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4208 	return error;
4209 }
4210 
4211 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4212 {
4213 	struct dlm_lkb *lkb;
4214 	struct dlm_rsb *r;
4215 	int error;
4216 
4217 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4218 	if (error)
4219 		return error;
4220 
4221 	r = lkb->lkb_resource;
4222 
4223 	hold_rsb(r);
4224 	lock_rsb(r);
4225 
4226 	error = validate_message(lkb, ms);
4227 	if (error)
4228 		goto out;
4229 
4230 	receive_flags_reply(lkb, ms, false);
4231 	if (is_altmode(lkb))
4232 		munge_altmode(lkb, ms);
4233 	grant_lock_pc(r, lkb, ms);
4234 	queue_cast(r, lkb, 0);
4235  out:
4236 	unlock_rsb(r);
4237 	put_rsb(r);
4238 	dlm_put_lkb(lkb);
4239 	return 0;
4240 }
4241 
4242 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4243 {
4244 	struct dlm_lkb *lkb;
4245 	struct dlm_rsb *r;
4246 	int error;
4247 
4248 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4249 	if (error)
4250 		return error;
4251 
4252 	r = lkb->lkb_resource;
4253 
4254 	hold_rsb(r);
4255 	lock_rsb(r);
4256 
4257 	error = validate_message(lkb, ms);
4258 	if (error)
4259 		goto out;
4260 
4261 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4262 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4263  out:
4264 	unlock_rsb(r);
4265 	put_rsb(r);
4266 	dlm_put_lkb(lkb);
4267 	return 0;
4268 }
4269 
4270 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4271 {
4272 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4273 
4274 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4275 	our_nodeid = dlm_our_nodeid();
4276 
4277 	len = receive_extralen(ms);
4278 
4279 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4280 				  &ret_nodeid, NULL);
4281 
4282 	/* Optimization: we're master so treat lookup as a request */
4283 	if (!error && ret_nodeid == our_nodeid) {
4284 		receive_request(ls, ms);
4285 		return;
4286 	}
4287 	send_lookup_reply(ls, ms, ret_nodeid, error);
4288 }
4289 
4290 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4291 {
4292 	char name[DLM_RESNAME_MAXLEN+1];
4293 	struct dlm_rsb *r;
4294 	int rv, len, dir_nodeid, from_nodeid;
4295 
4296 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4297 
4298 	len = receive_extralen(ms);
4299 
4300 	if (len > DLM_RESNAME_MAXLEN) {
4301 		log_error(ls, "receive_remove from %d bad len %d",
4302 			  from_nodeid, len);
4303 		return;
4304 	}
4305 
4306 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4307 	if (dir_nodeid != dlm_our_nodeid()) {
4308 		log_error(ls, "receive_remove from %d bad nodeid %d",
4309 			  from_nodeid, dir_nodeid);
4310 		return;
4311 	}
4312 
4313 	/*
4314 	 * Look for inactive rsb, if it's there, free it.
4315 	 * If the rsb is active, it's being used, and we should ignore this
4316 	 * message.  This is an expected race between the dir node sending a
4317 	 * request to the master node at the same time as the master node sends
4318 	 * a remove to the dir node.  The resolution to that race is for the
4319 	 * dir node to ignore the remove message, and the master node to
4320 	 * recreate the master rsb when it gets a request from the dir node for
4321 	 * an rsb it doesn't have.
4322 	 */
4323 
4324 	memset(name, 0, sizeof(name));
4325 	memcpy(name, ms->m_extra, len);
4326 
4327 	rcu_read_lock();
4328 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329 	if (rv) {
4330 		rcu_read_unlock();
4331 		/* should not happen */
4332 		log_error(ls, "%s from %d not found %s", __func__,
4333 			  from_nodeid, name);
4334 		return;
4335 	}
4336 
4337 	write_lock_bh(&ls->ls_rsbtbl_lock);
4338 	if (!rsb_flag(r, RSB_HASHED)) {
4339 		rcu_read_unlock();
4340 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4341 		/* should not happen */
4342 		log_error(ls, "%s from %d got removed during removal %s",
4343 			  __func__, from_nodeid, name);
4344 		return;
4345 	}
4346 	/* at this stage the rsb can only being freed here */
4347 	rcu_read_unlock();
4348 
4349 	if (!rsb_flag(r, RSB_INACTIVE)) {
4350 		if (r->res_master_nodeid != from_nodeid) {
4351 			/* should not happen */
4352 			log_error(ls, "receive_remove on active rsb from %d master %d",
4353 				  from_nodeid, r->res_master_nodeid);
4354 			dlm_print_rsb(r);
4355 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4356 			return;
4357 		}
4358 
4359 		/* Ignore the remove message, see race comment above. */
4360 
4361 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4362 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4363 			  name);
4364 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4365 		return;
4366 	}
4367 
4368 	if (r->res_master_nodeid != from_nodeid) {
4369 		log_error(ls, "receive_remove inactive from %d master %d",
4370 			  from_nodeid, r->res_master_nodeid);
4371 		dlm_print_rsb(r);
4372 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4373 		return;
4374 	}
4375 
4376 	list_del(&r->res_slow_list);
4377 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4378 			       dlm_rhash_rsb_params);
4379 	rsb_clear_flag(r, RSB_HASHED);
4380 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4381 
4382 	free_inactive_rsb(r);
4383 }
4384 
4385 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4386 {
4387 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4388 }
4389 
4390 static int receive_request_reply(struct dlm_ls *ls,
4391 				 const struct dlm_message *ms)
4392 {
4393 	struct dlm_lkb *lkb;
4394 	struct dlm_rsb *r;
4395 	int error, mstype, result;
4396 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4397 
4398 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4399 	if (error)
4400 		return error;
4401 
4402 	r = lkb->lkb_resource;
4403 	hold_rsb(r);
4404 	lock_rsb(r);
4405 
4406 	error = validate_message(lkb, ms);
4407 	if (error)
4408 		goto out;
4409 
4410 	mstype = lkb->lkb_wait_type;
4411 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4412 	if (error) {
4413 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4414 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4415 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4416 		dlm_dump_rsb(r);
4417 		goto out;
4418 	}
4419 
4420 	/* Optimization: the dir node was also the master, so it took our
4421 	   lookup as a request and sent request reply instead of lookup reply */
4422 	if (mstype == DLM_MSG_LOOKUP) {
4423 		r->res_master_nodeid = from_nodeid;
4424 		r->res_nodeid = from_nodeid;
4425 		lkb->lkb_nodeid = from_nodeid;
4426 	}
4427 
4428 	/* this is the value returned from do_request() on the master */
4429 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4430 
4431 	switch (result) {
4432 	case -EAGAIN:
4433 		/* request would block (be queued) on remote master */
4434 		queue_cast(r, lkb, -EAGAIN);
4435 		confirm_master(r, -EAGAIN);
4436 		unhold_lkb(lkb); /* undoes create_lkb() */
4437 		break;
4438 
4439 	case -EINPROGRESS:
4440 	case 0:
4441 		/* request was queued or granted on remote master */
4442 		receive_flags_reply(lkb, ms, false);
4443 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4444 		if (is_altmode(lkb))
4445 			munge_altmode(lkb, ms);
4446 		if (result) {
4447 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4448 		} else {
4449 			grant_lock_pc(r, lkb, ms);
4450 			queue_cast(r, lkb, 0);
4451 		}
4452 		confirm_master(r, result);
4453 		break;
4454 
4455 	case -EBADR:
4456 	case -ENOTBLK:
4457 		/* find_rsb failed to find rsb or rsb wasn't master */
4458 		log_limit(ls, "receive_request_reply %x from %d %d "
4459 			  "master %d dir %d first %x %s", lkb->lkb_id,
4460 			  from_nodeid, result, r->res_master_nodeid,
4461 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4462 
4463 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4464 		    r->res_master_nodeid != dlm_our_nodeid()) {
4465 			/* cause _request_lock->set_master->send_lookup */
4466 			r->res_master_nodeid = 0;
4467 			r->res_nodeid = -1;
4468 			lkb->lkb_nodeid = -1;
4469 		}
4470 
4471 		if (is_overlap(lkb)) {
4472 			/* we'll ignore error in cancel/unlock reply */
4473 			queue_cast_overlap(r, lkb);
4474 			confirm_master(r, result);
4475 			unhold_lkb(lkb); /* undoes create_lkb() */
4476 		} else {
4477 			_request_lock(r, lkb);
4478 
4479 			if (r->res_master_nodeid == dlm_our_nodeid())
4480 				confirm_master(r, 0);
4481 		}
4482 		break;
4483 
4484 	default:
4485 		log_error(ls, "receive_request_reply %x error %d",
4486 			  lkb->lkb_id, result);
4487 	}
4488 
4489 	if ((result == 0 || result == -EINPROGRESS) &&
4490 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4491 		log_debug(ls, "receive_request_reply %x result %d unlock",
4492 			  lkb->lkb_id, result);
4493 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4494 		send_unlock(r, lkb);
4495 	} else if ((result == -EINPROGRESS) &&
4496 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4497 				      &lkb->lkb_iflags)) {
4498 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4499 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4500 		send_cancel(r, lkb);
4501 	} else {
4502 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4503 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4504 	}
4505  out:
4506 	unlock_rsb(r);
4507 	put_rsb(r);
4508 	dlm_put_lkb(lkb);
4509 	return 0;
4510 }
4511 
4512 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4513 				    const struct dlm_message *ms, bool local)
4514 {
4515 	/* this is the value returned from do_convert() on the master */
4516 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4517 	case -EAGAIN:
4518 		/* convert would block (be queued) on remote master */
4519 		queue_cast(r, lkb, -EAGAIN);
4520 		break;
4521 
4522 	case -EDEADLK:
4523 		receive_flags_reply(lkb, ms, local);
4524 		revert_lock_pc(r, lkb);
4525 		queue_cast(r, lkb, -EDEADLK);
4526 		break;
4527 
4528 	case -EINPROGRESS:
4529 		/* convert was queued on remote master */
4530 		receive_flags_reply(lkb, ms, local);
4531 		if (is_demoted(lkb))
4532 			munge_demoted(lkb);
4533 		del_lkb(r, lkb);
4534 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4535 		break;
4536 
4537 	case 0:
4538 		/* convert was granted on remote master */
4539 		receive_flags_reply(lkb, ms, local);
4540 		if (is_demoted(lkb))
4541 			munge_demoted(lkb);
4542 		grant_lock_pc(r, lkb, ms);
4543 		queue_cast(r, lkb, 0);
4544 		break;
4545 
4546 	default:
4547 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4548 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4549 			  le32_to_cpu(ms->m_lkid),
4550 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4551 		dlm_print_rsb(r);
4552 		dlm_print_lkb(lkb);
4553 	}
4554 }
4555 
4556 static void _receive_convert_reply(struct dlm_lkb *lkb,
4557 				   const struct dlm_message *ms, bool local)
4558 {
4559 	struct dlm_rsb *r = lkb->lkb_resource;
4560 	int error;
4561 
4562 	hold_rsb(r);
4563 	lock_rsb(r);
4564 
4565 	error = validate_message(lkb, ms);
4566 	if (error)
4567 		goto out;
4568 
4569 	error = remove_from_waiters_ms(lkb, ms, local);
4570 	if (error)
4571 		goto out;
4572 
4573 	__receive_convert_reply(r, lkb, ms, local);
4574  out:
4575 	unlock_rsb(r);
4576 	put_rsb(r);
4577 }
4578 
4579 static int receive_convert_reply(struct dlm_ls *ls,
4580 				 const struct dlm_message *ms)
4581 {
4582 	struct dlm_lkb *lkb;
4583 	int error;
4584 
4585 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4586 	if (error)
4587 		return error;
4588 
4589 	_receive_convert_reply(lkb, ms, false);
4590 	dlm_put_lkb(lkb);
4591 	return 0;
4592 }
4593 
4594 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4595 				  const struct dlm_message *ms, bool local)
4596 {
4597 	struct dlm_rsb *r = lkb->lkb_resource;
4598 	int error;
4599 
4600 	hold_rsb(r);
4601 	lock_rsb(r);
4602 
4603 	error = validate_message(lkb, ms);
4604 	if (error)
4605 		goto out;
4606 
4607 	error = remove_from_waiters_ms(lkb, ms, local);
4608 	if (error)
4609 		goto out;
4610 
4611 	/* this is the value returned from do_unlock() on the master */
4612 
4613 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4614 	case -DLM_EUNLOCK:
4615 		receive_flags_reply(lkb, ms, local);
4616 		remove_lock_pc(r, lkb);
4617 		queue_cast(r, lkb, -DLM_EUNLOCK);
4618 		break;
4619 	case -ENOENT:
4620 		break;
4621 	default:
4622 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4623 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4624 	}
4625  out:
4626 	unlock_rsb(r);
4627 	put_rsb(r);
4628 }
4629 
4630 static int receive_unlock_reply(struct dlm_ls *ls,
4631 				const struct dlm_message *ms)
4632 {
4633 	struct dlm_lkb *lkb;
4634 	int error;
4635 
4636 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4637 	if (error)
4638 		return error;
4639 
4640 	_receive_unlock_reply(lkb, ms, false);
4641 	dlm_put_lkb(lkb);
4642 	return 0;
4643 }
4644 
4645 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4646 				  const struct dlm_message *ms, bool local)
4647 {
4648 	struct dlm_rsb *r = lkb->lkb_resource;
4649 	int error;
4650 
4651 	hold_rsb(r);
4652 	lock_rsb(r);
4653 
4654 	error = validate_message(lkb, ms);
4655 	if (error)
4656 		goto out;
4657 
4658 	error = remove_from_waiters_ms(lkb, ms, local);
4659 	if (error)
4660 		goto out;
4661 
4662 	/* this is the value returned from do_cancel() on the master */
4663 
4664 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4665 	case -DLM_ECANCEL:
4666 		receive_flags_reply(lkb, ms, local);
4667 		revert_lock_pc(r, lkb);
4668 		queue_cast(r, lkb, -DLM_ECANCEL);
4669 		break;
4670 	case 0:
4671 		break;
4672 	default:
4673 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4674 			  lkb->lkb_id,
4675 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4676 	}
4677  out:
4678 	unlock_rsb(r);
4679 	put_rsb(r);
4680 }
4681 
4682 static int receive_cancel_reply(struct dlm_ls *ls,
4683 				const struct dlm_message *ms)
4684 {
4685 	struct dlm_lkb *lkb;
4686 	int error;
4687 
4688 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4689 	if (error)
4690 		return error;
4691 
4692 	_receive_cancel_reply(lkb, ms, false);
4693 	dlm_put_lkb(lkb);
4694 	return 0;
4695 }
4696 
4697 static void receive_lookup_reply(struct dlm_ls *ls,
4698 				 const struct dlm_message *ms)
4699 {
4700 	struct dlm_lkb *lkb;
4701 	struct dlm_rsb *r;
4702 	int error, ret_nodeid;
4703 	int do_lookup_list = 0;
4704 
4705 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4706 	if (error) {
4707 		log_error(ls, "%s no lkid %x", __func__,
4708 			  le32_to_cpu(ms->m_lkid));
4709 		return;
4710 	}
4711 
4712 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4713 	   FIXME: will a non-zero error ever be returned? */
4714 
4715 	r = lkb->lkb_resource;
4716 	hold_rsb(r);
4717 	lock_rsb(r);
4718 
4719 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4720 	if (error)
4721 		goto out;
4722 
4723 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4724 
4725 	/* We sometimes receive a request from the dir node for this
4726 	   rsb before we've received the dir node's loookup_reply for it.
4727 	   The request from the dir node implies we're the master, so we set
4728 	   ourself as master in receive_request_reply, and verify here that
4729 	   we are indeed the master. */
4730 
4731 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4732 		/* This should never happen */
4733 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4734 			  "master %d dir %d our %d first %x %s",
4735 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4736 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4737 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4738 	}
4739 
4740 	if (ret_nodeid == dlm_our_nodeid()) {
4741 		r->res_master_nodeid = ret_nodeid;
4742 		r->res_nodeid = 0;
4743 		do_lookup_list = 1;
4744 		r->res_first_lkid = 0;
4745 	} else if (ret_nodeid == -1) {
4746 		/* the remote node doesn't believe it's the dir node */
4747 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4748 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4749 		r->res_master_nodeid = 0;
4750 		r->res_nodeid = -1;
4751 		lkb->lkb_nodeid = -1;
4752 	} else {
4753 		/* set_master() will set lkb_nodeid from r */
4754 		r->res_master_nodeid = ret_nodeid;
4755 		r->res_nodeid = ret_nodeid;
4756 	}
4757 
4758 	if (is_overlap(lkb)) {
4759 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4760 			  lkb->lkb_id, dlm_iflags_val(lkb));
4761 		queue_cast_overlap(r, lkb);
4762 		unhold_lkb(lkb); /* undoes create_lkb() */
4763 		goto out_list;
4764 	}
4765 
4766 	_request_lock(r, lkb);
4767 
4768  out_list:
4769 	if (do_lookup_list)
4770 		process_lookup_list(r);
4771  out:
4772 	unlock_rsb(r);
4773 	put_rsb(r);
4774 	dlm_put_lkb(lkb);
4775 }
4776 
4777 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4778 			     uint32_t saved_seq)
4779 {
4780 	int error = 0, noent = 0;
4781 
4782 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4783 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4784 			  le32_to_cpu(ms->m_type),
4785 			  le32_to_cpu(ms->m_header.h_nodeid),
4786 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4787 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4788 		return;
4789 	}
4790 
4791 	switch (ms->m_type) {
4792 
4793 	/* messages sent to a master node */
4794 
4795 	case cpu_to_le32(DLM_MSG_REQUEST):
4796 		error = receive_request(ls, ms);
4797 		break;
4798 
4799 	case cpu_to_le32(DLM_MSG_CONVERT):
4800 		error = receive_convert(ls, ms);
4801 		break;
4802 
4803 	case cpu_to_le32(DLM_MSG_UNLOCK):
4804 		error = receive_unlock(ls, ms);
4805 		break;
4806 
4807 	case cpu_to_le32(DLM_MSG_CANCEL):
4808 		noent = 1;
4809 		error = receive_cancel(ls, ms);
4810 		break;
4811 
4812 	/* messages sent from a master node (replies to above) */
4813 
4814 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4815 		error = receive_request_reply(ls, ms);
4816 		break;
4817 
4818 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4819 		error = receive_convert_reply(ls, ms);
4820 		break;
4821 
4822 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4823 		error = receive_unlock_reply(ls, ms);
4824 		break;
4825 
4826 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4827 		error = receive_cancel_reply(ls, ms);
4828 		break;
4829 
4830 	/* messages sent from a master node (only two types of async msg) */
4831 
4832 	case cpu_to_le32(DLM_MSG_GRANT):
4833 		noent = 1;
4834 		error = receive_grant(ls, ms);
4835 		break;
4836 
4837 	case cpu_to_le32(DLM_MSG_BAST):
4838 		noent = 1;
4839 		error = receive_bast(ls, ms);
4840 		break;
4841 
4842 	/* messages sent to a dir node */
4843 
4844 	case cpu_to_le32(DLM_MSG_LOOKUP):
4845 		receive_lookup(ls, ms);
4846 		break;
4847 
4848 	case cpu_to_le32(DLM_MSG_REMOVE):
4849 		receive_remove(ls, ms);
4850 		break;
4851 
4852 	/* messages sent from a dir node (remove has no reply) */
4853 
4854 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4855 		receive_lookup_reply(ls, ms);
4856 		break;
4857 
4858 	/* other messages */
4859 
4860 	case cpu_to_le32(DLM_MSG_PURGE):
4861 		receive_purge(ls, ms);
4862 		break;
4863 
4864 	default:
4865 		log_error(ls, "unknown message type %d",
4866 			  le32_to_cpu(ms->m_type));
4867 	}
4868 
4869 	/*
4870 	 * When checking for ENOENT, we're checking the result of
4871 	 * find_lkb(m_remid):
4872 	 *
4873 	 * The lock id referenced in the message wasn't found.  This may
4874 	 * happen in normal usage for the async messages and cancel, so
4875 	 * only use log_debug for them.
4876 	 *
4877 	 * Some errors are expected and normal.
4878 	 */
4879 
4880 	if (error == -ENOENT && noent) {
4881 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4882 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4883 			  le32_to_cpu(ms->m_header.h_nodeid),
4884 			  le32_to_cpu(ms->m_lkid), saved_seq);
4885 	} else if (error == -ENOENT) {
4886 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4887 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4888 			  le32_to_cpu(ms->m_header.h_nodeid),
4889 			  le32_to_cpu(ms->m_lkid), saved_seq);
4890 
4891 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4892 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4893 	}
4894 
4895 	if (error == -EINVAL) {
4896 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4897 			  "saved_seq %u",
4898 			  le32_to_cpu(ms->m_type),
4899 			  le32_to_cpu(ms->m_header.h_nodeid),
4900 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4901 			  saved_seq);
4902 	}
4903 }
4904 
4905 /* If the lockspace is in recovery mode (locking stopped), then normal
4906    messages are saved on the requestqueue for processing after recovery is
4907    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4908    messages off the requestqueue before we process new ones. This occurs right
4909    after recovery completes when we transition from saving all messages on
4910    requestqueue, to processing all the saved messages, to processing new
4911    messages as they arrive. */
4912 
4913 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4914 				int nodeid)
4915 {
4916 try_again:
4917 	read_lock_bh(&ls->ls_requestqueue_lock);
4918 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4919 		/* If we were a member of this lockspace, left, and rejoined,
4920 		   other nodes may still be sending us messages from the
4921 		   lockspace generation before we left. */
4922 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4923 			read_unlock_bh(&ls->ls_requestqueue_lock);
4924 			log_limit(ls, "receive %d from %d ignore old gen",
4925 				  le32_to_cpu(ms->m_type), nodeid);
4926 			return;
4927 		}
4928 
4929 		read_unlock_bh(&ls->ls_requestqueue_lock);
4930 		write_lock_bh(&ls->ls_requestqueue_lock);
4931 		/* recheck because we hold writelock now */
4932 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4933 			write_unlock_bh(&ls->ls_requestqueue_lock);
4934 			goto try_again;
4935 		}
4936 
4937 		dlm_add_requestqueue(ls, nodeid, ms);
4938 		write_unlock_bh(&ls->ls_requestqueue_lock);
4939 	} else {
4940 		_receive_message(ls, ms, 0);
4941 		read_unlock_bh(&ls->ls_requestqueue_lock);
4942 	}
4943 }
4944 
4945 /* This is called by dlm_recoverd to process messages that were saved on
4946    the requestqueue. */
4947 
4948 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4949 			       uint32_t saved_seq)
4950 {
4951 	_receive_message(ls, ms, saved_seq);
4952 }
4953 
4954 /* This is called by the midcomms layer when something is received for
4955    the lockspace.  It could be either a MSG (normal message sent as part of
4956    standard locking activity) or an RCOM (recovery message sent as part of
4957    lockspace recovery). */
4958 
4959 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4960 {
4961 	const struct dlm_header *hd = &p->header;
4962 	struct dlm_ls *ls;
4963 	int type = 0;
4964 
4965 	switch (hd->h_cmd) {
4966 	case DLM_MSG:
4967 		type = le32_to_cpu(p->message.m_type);
4968 		break;
4969 	case DLM_RCOM:
4970 		type = le32_to_cpu(p->rcom.rc_type);
4971 		break;
4972 	default:
4973 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4974 		return;
4975 	}
4976 
4977 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4978 		log_print("invalid h_nodeid %d from %d lockspace %x",
4979 			  le32_to_cpu(hd->h_nodeid), nodeid,
4980 			  le32_to_cpu(hd->u.h_lockspace));
4981 		return;
4982 	}
4983 
4984 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4985 	if (!ls) {
4986 		if (dlm_config.ci_log_debug) {
4987 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4988 				"%u from %d cmd %d type %d\n",
4989 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4990 				hd->h_cmd, type);
4991 		}
4992 
4993 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4994 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4995 		return;
4996 	}
4997 
4998 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4999 	   be inactive (in this ls) before transitioning to recovery mode */
5000 
5001 	read_lock_bh(&ls->ls_recv_active);
5002 	if (hd->h_cmd == DLM_MSG)
5003 		dlm_receive_message(ls, &p->message, nodeid);
5004 	else if (hd->h_cmd == DLM_RCOM)
5005 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5006 	else
5007 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5008 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5009 	read_unlock_bh(&ls->ls_recv_active);
5010 
5011 	dlm_put_lockspace(ls);
5012 }
5013 
5014 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5015 				   struct dlm_message *ms_local)
5016 {
5017 	if (middle_conversion(lkb)) {
5018 		log_rinfo(ls, "%s %x middle convert in progress", __func__,
5019 			 lkb->lkb_id);
5020 
5021 		/* We sent this lock to the new master. The new master will
5022 		 * tell us when it's granted.  We no longer need a reply, so
5023 		 * use a fake reply to put the lkb into the right state.
5024 		 */
5025 		hold_lkb(lkb);
5026 		memset(ms_local, 0, sizeof(struct dlm_message));
5027 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5028 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5029 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5030 		_receive_convert_reply(lkb, ms_local, true);
5031 		unhold_lkb(lkb);
5032 
5033 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5034 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5035 	}
5036 
5037 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5038 	   conversions are async; there's no reply from the remote master */
5039 }
5040 
5041 /* A waiting lkb needs recovery if the master node has failed, or
5042    the master node is changing (only when no directory is used) */
5043 
5044 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5045 				 int dir_nodeid)
5046 {
5047 	if (dlm_no_directory(ls))
5048 		return 1;
5049 
5050 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5051 		return 1;
5052 
5053 	return 0;
5054 }
5055 
5056 /* Recovery for locks that are waiting for replies from nodes that are now
5057    gone.  We can just complete unlocks and cancels by faking a reply from the
5058    dead node.  Requests and up-conversions we flag to be resent after
5059    recovery.  Down-conversions can just be completed with a fake reply like
5060    unlocks.  Conversions between PR and CW need special attention. */
5061 
5062 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5063 {
5064 	struct dlm_lkb *lkb, *safe;
5065 	struct dlm_message *ms_local;
5066 	int wait_type, local_unlock_result, local_cancel_result;
5067 	int dir_nodeid;
5068 
5069 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5070 	if (!ms_local)
5071 		return;
5072 
5073 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5074 
5075 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5076 
5077 		/* exclude debug messages about unlocks because there can be so
5078 		   many and they aren't very interesting */
5079 
5080 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5081 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5082 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5083 				  lkb->lkb_id,
5084 				  lkb->lkb_remid,
5085 				  lkb->lkb_wait_type,
5086 				  lkb->lkb_resource->res_nodeid,
5087 				  lkb->lkb_nodeid,
5088 				  lkb->lkb_wait_nodeid,
5089 				  dir_nodeid);
5090 		}
5091 
5092 		/* all outstanding lookups, regardless of destination  will be
5093 		   resent after recovery is done */
5094 
5095 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5096 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5097 			continue;
5098 		}
5099 
5100 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5101 			continue;
5102 
5103 		wait_type = lkb->lkb_wait_type;
5104 		local_unlock_result = -DLM_EUNLOCK;
5105 		local_cancel_result = -DLM_ECANCEL;
5106 
5107 		/* Main reply may have been received leaving a zero wait_type,
5108 		   but a reply for the overlapping op may not have been
5109 		   received.  In that case we need to fake the appropriate
5110 		   reply for the overlap op. */
5111 
5112 		if (!wait_type) {
5113 			if (is_overlap_cancel(lkb)) {
5114 				wait_type = DLM_MSG_CANCEL;
5115 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5116 					local_cancel_result = 0;
5117 			}
5118 			if (is_overlap_unlock(lkb)) {
5119 				wait_type = DLM_MSG_UNLOCK;
5120 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5121 					local_unlock_result = -ENOENT;
5122 			}
5123 
5124 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5125 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5126 				  local_cancel_result, local_unlock_result);
5127 		}
5128 
5129 		switch (wait_type) {
5130 
5131 		case DLM_MSG_REQUEST:
5132 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5133 			break;
5134 
5135 		case DLM_MSG_CONVERT:
5136 			recover_convert_waiter(ls, lkb, ms_local);
5137 			break;
5138 
5139 		case DLM_MSG_UNLOCK:
5140 			hold_lkb(lkb);
5141 			memset(ms_local, 0, sizeof(struct dlm_message));
5142 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5143 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5144 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5145 			_receive_unlock_reply(lkb, ms_local, true);
5146 			dlm_put_lkb(lkb);
5147 			break;
5148 
5149 		case DLM_MSG_CANCEL:
5150 			hold_lkb(lkb);
5151 			memset(ms_local, 0, sizeof(struct dlm_message));
5152 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5153 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5154 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5155 			_receive_cancel_reply(lkb, ms_local, true);
5156 			dlm_put_lkb(lkb);
5157 			break;
5158 
5159 		default:
5160 			log_error(ls, "invalid lkb wait_type %d %d",
5161 				  lkb->lkb_wait_type, wait_type);
5162 		}
5163 		schedule();
5164 	}
5165 	kfree(ms_local);
5166 }
5167 
5168 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5169 {
5170 	struct dlm_lkb *lkb = NULL, *iter;
5171 
5172 	spin_lock_bh(&ls->ls_waiters_lock);
5173 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5174 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5175 			hold_lkb(iter);
5176 			lkb = iter;
5177 			break;
5178 		}
5179 	}
5180 	spin_unlock_bh(&ls->ls_waiters_lock);
5181 
5182 	return lkb;
5183 }
5184 
5185 /*
5186  * Forced state reset for locks that were in the middle of remote operations
5187  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5188  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5189  * list need to be reevaluated; some may need resending to a different node
5190  * than previously, and some may now need local handling rather than remote.
5191  *
5192  * First, the lkb state for the voided remote operation is forcibly reset,
5193  * equivalent to what remove_from_waiters() would normally do:
5194  * . lkb removed from ls_waiters list
5195  * . lkb wait_type cleared
5196  * . lkb waiters_count cleared
5197  * . lkb ref count decremented for each waiters_count (almost always 1,
5198  *   but possibly 2 in case of cancel/unlock overlapping, which means
5199  *   two remote replies were being expected for the lkb.)
5200  *
5201  * Second, the lkb is reprocessed like an original operation would be,
5202  * by passing it to _request_lock or _convert_lock, which will either
5203  * process the lkb operation locally, or send it to a remote node again
5204  * and put the lkb back onto the waiters list.
5205  *
5206  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5207  * force-unlock or cancel, either from before recovery began, or after recovery
5208  * finished.  If this is the case, the unlock/cancel is done directly, and the
5209  * original operation is not initiated again (no _request_lock/_convert_lock.)
5210  */
5211 
5212 int dlm_recover_waiters_post(struct dlm_ls *ls)
5213 {
5214 	struct dlm_lkb *lkb;
5215 	struct dlm_rsb *r;
5216 	int error = 0, mstype, err, oc, ou;
5217 
5218 	while (1) {
5219 		if (dlm_locking_stopped(ls)) {
5220 			log_debug(ls, "recover_waiters_post aborted");
5221 			error = -EINTR;
5222 			break;
5223 		}
5224 
5225 		/*
5226 		 * Find an lkb from the waiters list that's been affected by
5227 		 * recovery node changes, and needs to be reprocessed.  Does
5228 		 * hold_lkb(), adding a refcount.
5229 		 */
5230 		lkb = find_resend_waiter(ls);
5231 		if (!lkb)
5232 			break;
5233 
5234 		r = lkb->lkb_resource;
5235 		hold_rsb(r);
5236 		lock_rsb(r);
5237 
5238 		/*
5239 		 * If the lkb has been flagged for a force unlock or cancel,
5240 		 * then the reprocessing below will be replaced by just doing
5241 		 * the unlock/cancel directly.
5242 		 */
5243 		mstype = lkb->lkb_wait_type;
5244 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5245 					&lkb->lkb_iflags);
5246 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5247 					&lkb->lkb_iflags);
5248 		err = 0;
5249 
5250 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5251 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5252 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5253 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5254 			  dlm_dir_nodeid(r), oc, ou);
5255 
5256 		/*
5257 		 * No reply to the pre-recovery operation will now be received,
5258 		 * so a forced equivalent of remove_from_waiters() is needed to
5259 		 * reset the waiters state that was in place before recovery.
5260 		 */
5261 
5262 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5263 
5264 		/* Forcibly clear wait_type */
5265 		lkb->lkb_wait_type = 0;
5266 
5267 		/*
5268 		 * Forcibly reset wait_count and associated refcount.  The
5269 		 * wait_count will almost always be 1, but in case of an
5270 		 * overlapping unlock/cancel it could be 2: see where
5271 		 * add_to_waiters() finds the lkb is already on the waiters
5272 		 * list and does lkb_wait_count++; hold_lkb().
5273 		 */
5274 		while (lkb->lkb_wait_count) {
5275 			lkb->lkb_wait_count--;
5276 			unhold_lkb(lkb);
5277 		}
5278 
5279 		/* Forcibly remove from waiters list */
5280 		spin_lock_bh(&ls->ls_waiters_lock);
5281 		list_del_init(&lkb->lkb_wait_reply);
5282 		spin_unlock_bh(&ls->ls_waiters_lock);
5283 
5284 		/*
5285 		 * The lkb is now clear of all prior waiters state and can be
5286 		 * processed locally, or sent to remote node again, or directly
5287 		 * cancelled/unlocked.
5288 		 */
5289 
5290 		if (oc || ou) {
5291 			/* do an unlock or cancel instead of resending */
5292 			switch (mstype) {
5293 			case DLM_MSG_LOOKUP:
5294 			case DLM_MSG_REQUEST:
5295 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5296 							-DLM_ECANCEL);
5297 				unhold_lkb(lkb); /* undoes create_lkb() */
5298 				break;
5299 			case DLM_MSG_CONVERT:
5300 				if (oc) {
5301 					queue_cast(r, lkb, -DLM_ECANCEL);
5302 				} else {
5303 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5304 					_unlock_lock(r, lkb);
5305 				}
5306 				break;
5307 			default:
5308 				err = 1;
5309 			}
5310 		} else {
5311 			switch (mstype) {
5312 			case DLM_MSG_LOOKUP:
5313 			case DLM_MSG_REQUEST:
5314 				_request_lock(r, lkb);
5315 				if (r->res_nodeid != -1 && is_master(r))
5316 					confirm_master(r, 0);
5317 				break;
5318 			case DLM_MSG_CONVERT:
5319 				_convert_lock(r, lkb);
5320 				break;
5321 			default:
5322 				err = 1;
5323 			}
5324 		}
5325 
5326 		if (err) {
5327 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5328 				  "dir_nodeid %d overlap %d %d",
5329 				  lkb->lkb_id, mstype, r->res_nodeid,
5330 				  dlm_dir_nodeid(r), oc, ou);
5331 		}
5332 		unlock_rsb(r);
5333 		put_rsb(r);
5334 		dlm_put_lkb(lkb);
5335 	}
5336 
5337 	return error;
5338 }
5339 
5340 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5341 			      struct list_head *list)
5342 {
5343 	struct dlm_lkb *lkb, *safe;
5344 
5345 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5346 		if (!is_master_copy(lkb))
5347 			continue;
5348 
5349 		/* don't purge lkbs we've added in recover_master_copy for
5350 		   the current recovery seq */
5351 
5352 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5353 			continue;
5354 
5355 		del_lkb(r, lkb);
5356 
5357 		/* this put should free the lkb */
5358 		if (!dlm_put_lkb(lkb))
5359 			log_error(ls, "purged mstcpy lkb not released");
5360 	}
5361 }
5362 
5363 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5364 {
5365 	struct dlm_ls *ls = r->res_ls;
5366 
5367 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5368 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5369 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5370 }
5371 
5372 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5373 			    struct list_head *list,
5374 			    int nodeid_gone, unsigned int *count)
5375 {
5376 	struct dlm_lkb *lkb, *safe;
5377 
5378 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5379 		if (!is_master_copy(lkb))
5380 			continue;
5381 
5382 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5383 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5384 
5385 			/* tell recover_lvb to invalidate the lvb
5386 			   because a node holding EX/PW failed */
5387 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5388 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5389 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5390 			}
5391 
5392 			del_lkb(r, lkb);
5393 
5394 			/* this put should free the lkb */
5395 			if (!dlm_put_lkb(lkb))
5396 				log_error(ls, "purged dead lkb not released");
5397 
5398 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5399 
5400 			(*count)++;
5401 		}
5402 	}
5403 }
5404 
5405 /* Get rid of locks held by nodes that are gone. */
5406 
5407 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5408 {
5409 	struct dlm_rsb *r;
5410 	struct dlm_member *memb;
5411 	int nodes_count = 0;
5412 	int nodeid_gone = 0;
5413 	unsigned int lkb_count = 0;
5414 
5415 	/* cache one removed nodeid to optimize the common
5416 	   case of a single node removed */
5417 
5418 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5419 		nodes_count++;
5420 		nodeid_gone = memb->nodeid;
5421 	}
5422 
5423 	if (!nodes_count)
5424 		return;
5425 
5426 	list_for_each_entry(r, root_list, res_root_list) {
5427 		lock_rsb(r);
5428 		if (r->res_nodeid != -1 && is_master(r)) {
5429 			purge_dead_list(ls, r, &r->res_grantqueue,
5430 					nodeid_gone, &lkb_count);
5431 			purge_dead_list(ls, r, &r->res_convertqueue,
5432 					nodeid_gone, &lkb_count);
5433 			purge_dead_list(ls, r, &r->res_waitqueue,
5434 					nodeid_gone, &lkb_count);
5435 		}
5436 		unlock_rsb(r);
5437 
5438 		cond_resched();
5439 	}
5440 
5441 	if (lkb_count)
5442 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5443 			  lkb_count, nodes_count);
5444 }
5445 
5446 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5447 {
5448 	struct dlm_rsb *r;
5449 
5450 	read_lock_bh(&ls->ls_rsbtbl_lock);
5451 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5452 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5453 			continue;
5454 		if (!is_master(r)) {
5455 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5456 			continue;
5457 		}
5458 		hold_rsb(r);
5459 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5460 		return r;
5461 	}
5462 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5463 	return NULL;
5464 }
5465 
5466 /*
5467  * Attempt to grant locks on resources that we are the master of.
5468  * Locks may have become grantable during recovery because locks
5469  * from departed nodes have been purged (or not rebuilt), allowing
5470  * previously blocked locks to now be granted.  The subset of rsb's
5471  * we are interested in are those with lkb's on either the convert or
5472  * waiting queues.
5473  *
5474  * Simplest would be to go through each master rsb and check for non-empty
5475  * convert or waiting queues, and attempt to grant on those rsbs.
5476  * Checking the queues requires lock_rsb, though, for which we'd need
5477  * to release the rsbtbl lock.  This would make iterating through all
5478  * rsb's very inefficient.  So, we rely on earlier recovery routines
5479  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5480  * locks for.
5481  */
5482 
5483 void dlm_recover_grant(struct dlm_ls *ls)
5484 {
5485 	struct dlm_rsb *r;
5486 	unsigned int count = 0;
5487 	unsigned int rsb_count = 0;
5488 	unsigned int lkb_count = 0;
5489 
5490 	while (1) {
5491 		r = find_grant_rsb(ls);
5492 		if (!r)
5493 			break;
5494 
5495 		rsb_count++;
5496 		count = 0;
5497 		lock_rsb(r);
5498 		/* the RECOVER_GRANT flag is checked in the grant path */
5499 		grant_pending_locks(r, &count);
5500 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5501 		lkb_count += count;
5502 		confirm_master(r, 0);
5503 		unlock_rsb(r);
5504 		put_rsb(r);
5505 		cond_resched();
5506 	}
5507 
5508 	if (lkb_count)
5509 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5510 			  lkb_count, rsb_count);
5511 }
5512 
5513 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5514 					 uint32_t remid)
5515 {
5516 	struct dlm_lkb *lkb;
5517 
5518 	list_for_each_entry(lkb, head, lkb_statequeue) {
5519 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5520 			return lkb;
5521 	}
5522 	return NULL;
5523 }
5524 
5525 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5526 				    uint32_t remid)
5527 {
5528 	struct dlm_lkb *lkb;
5529 
5530 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5531 	if (lkb)
5532 		return lkb;
5533 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5534 	if (lkb)
5535 		return lkb;
5536 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5537 	if (lkb)
5538 		return lkb;
5539 	return NULL;
5540 }
5541 
5542 /* needs at least dlm_rcom + rcom_lock */
5543 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5544 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5545 {
5546 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5547 
5548 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5549 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5550 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5551 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5552 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5553 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5554 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5555 	lkb->lkb_rqmode = rl->rl_rqmode;
5556 	lkb->lkb_grmode = rl->rl_grmode;
5557 	/* don't set lkb_status because add_lkb wants to itself */
5558 
5559 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5560 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5561 
5562 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5563 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5564 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5565 		if (lvblen > ls->ls_lvblen)
5566 			return -EINVAL;
5567 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5568 		if (!lkb->lkb_lvbptr)
5569 			return -ENOMEM;
5570 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5571 	}
5572 
5573 	/* Conversions between PR and CW (middle modes) need special handling.
5574 	   The real granted mode of these converting locks cannot be determined
5575 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5576 
5577 	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5578 		/* We may need to adjust grmode depending on other granted locks. */
5579 		log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5580 			  __func__, lkb->lkb_id, lkb->lkb_grmode,
5581 			  lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5582 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5583 	}
5584 
5585 	return 0;
5586 }
5587 
5588 /* This lkb may have been recovered in a previous aborted recovery so we need
5589    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5590    If so we just send back a standard reply.  If not, we create a new lkb with
5591    the given values and send back our lkid.  We send back our lkid by sending
5592    back the rcom_lock struct we got but with the remid field filled in. */
5593 
5594 /* needs at least dlm_rcom + rcom_lock */
5595 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5596 			    __le32 *rl_remid, __le32 *rl_result)
5597 {
5598 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5599 	struct dlm_rsb *r;
5600 	struct dlm_lkb *lkb;
5601 	uint32_t remid = 0;
5602 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5603 	int error;
5604 
5605 	/* init rl_remid with rcom lock rl_remid */
5606 	*rl_remid = rl->rl_remid;
5607 
5608 	if (rl->rl_parent_lkid) {
5609 		error = -EOPNOTSUPP;
5610 		goto out;
5611 	}
5612 
5613 	remid = le32_to_cpu(rl->rl_lkid);
5614 
5615 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5616 	   have to require it.  Recovery of masters on one node can overlap
5617 	   recovery of locks on another node, so one node can send us MSTCPY
5618 	   locks before we've made ourselves master of this rsb.  We can still
5619 	   add new MSTCPY locks that we receive here without any harm; when
5620 	   we make ourselves master, dlm_recover_masters() won't touch the
5621 	   MSTCPY locks we've received early. */
5622 
5623 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5624 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5625 	if (error)
5626 		goto out;
5627 
5628 	lock_rsb(r);
5629 
5630 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5631 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5632 			  from_nodeid, remid);
5633 		error = -EBADR;
5634 		goto out_unlock;
5635 	}
5636 
5637 	lkb = search_remid(r, from_nodeid, remid);
5638 	if (lkb) {
5639 		error = -EEXIST;
5640 		goto out_remid;
5641 	}
5642 
5643 	error = create_lkb(ls, &lkb);
5644 	if (error)
5645 		goto out_unlock;
5646 
5647 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5648 	if (error) {
5649 		__put_lkb(ls, lkb);
5650 		goto out_unlock;
5651 	}
5652 
5653 	attach_lkb(r, lkb);
5654 	add_lkb(r, lkb, rl->rl_status);
5655 	ls->ls_recover_locks_in++;
5656 
5657 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5658 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5659 
5660  out_remid:
5661 	/* this is the new value returned to the lock holder for
5662 	   saving in its process-copy lkb */
5663 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5664 
5665 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5666 
5667  out_unlock:
5668 	unlock_rsb(r);
5669 	put_rsb(r);
5670  out:
5671 	if (error && error != -EEXIST)
5672 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5673 			  from_nodeid, remid, error);
5674 	*rl_result = cpu_to_le32(error);
5675 	return error;
5676 }
5677 
5678 /* needs at least dlm_rcom + rcom_lock */
5679 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5680 			     uint64_t seq)
5681 {
5682 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5683 	struct dlm_rsb *r;
5684 	struct dlm_lkb *lkb;
5685 	uint32_t lkid, remid;
5686 	int error, result;
5687 
5688 	lkid = le32_to_cpu(rl->rl_lkid);
5689 	remid = le32_to_cpu(rl->rl_remid);
5690 	result = le32_to_cpu(rl->rl_result);
5691 
5692 	error = find_lkb(ls, lkid, &lkb);
5693 	if (error) {
5694 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5695 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5696 			  result);
5697 		return error;
5698 	}
5699 
5700 	r = lkb->lkb_resource;
5701 	hold_rsb(r);
5702 	lock_rsb(r);
5703 
5704 	if (!is_process_copy(lkb)) {
5705 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5706 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 			  result);
5708 		dlm_dump_rsb(r);
5709 		unlock_rsb(r);
5710 		put_rsb(r);
5711 		dlm_put_lkb(lkb);
5712 		return -EINVAL;
5713 	}
5714 
5715 	switch (result) {
5716 	case -EBADR:
5717 		/* There's a chance the new master received our lock before
5718 		   dlm_recover_master_reply(), this wouldn't happen if we did
5719 		   a barrier between recover_masters and recover_locks. */
5720 
5721 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5722 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5723 			  result);
5724 
5725 		dlm_send_rcom_lock(r, lkb, seq);
5726 		goto out;
5727 	case -EEXIST:
5728 	case 0:
5729 		lkb->lkb_remid = remid;
5730 		break;
5731 	default:
5732 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5733 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5734 			  result);
5735 	}
5736 
5737 	/* an ack for dlm_recover_locks() which waits for replies from
5738 	   all the locks it sends to new masters */
5739 	dlm_recovered_lock(r);
5740  out:
5741 	unlock_rsb(r);
5742 	put_rsb(r);
5743 	dlm_put_lkb(lkb);
5744 
5745 	return 0;
5746 }
5747 
5748 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5749 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5750 {
5751 	struct dlm_lkb *lkb;
5752 	struct dlm_args args;
5753 	bool do_put = true;
5754 	int error;
5755 
5756 	dlm_lock_recovery(ls);
5757 
5758 	error = create_lkb(ls, &lkb);
5759 	if (error) {
5760 		kfree(ua);
5761 		goto out;
5762 	}
5763 
5764 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5765 
5766 	if (flags & DLM_LKF_VALBLK) {
5767 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5768 		if (!ua->lksb.sb_lvbptr) {
5769 			kfree(ua);
5770 			error = -ENOMEM;
5771 			goto out_put;
5772 		}
5773 	}
5774 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5775 			      fake_bastfn, &args);
5776 	if (error) {
5777 		kfree(ua->lksb.sb_lvbptr);
5778 		ua->lksb.sb_lvbptr = NULL;
5779 		kfree(ua);
5780 		goto out_put;
5781 	}
5782 
5783 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5784 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5785 	   lock and that lkb_astparam is the dlm_user_args structure. */
5786 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5787 	error = request_lock(ls, lkb, name, namelen, &args);
5788 
5789 	switch (error) {
5790 	case 0:
5791 		break;
5792 	case -EINPROGRESS:
5793 		error = 0;
5794 		break;
5795 	case -EAGAIN:
5796 		error = 0;
5797 		fallthrough;
5798 	default:
5799 		goto out_put;
5800 	}
5801 
5802 	/* add this new lkb to the per-process list of locks */
5803 	spin_lock_bh(&ua->proc->locks_spin);
5804 	hold_lkb(lkb);
5805 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5806 	spin_unlock_bh(&ua->proc->locks_spin);
5807 	do_put = false;
5808  out_put:
5809 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5810 	if (do_put)
5811 		__put_lkb(ls, lkb);
5812  out:
5813 	dlm_unlock_recovery(ls);
5814 	return error;
5815 }
5816 
5817 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5818 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5819 {
5820 	struct dlm_lkb *lkb;
5821 	struct dlm_args args;
5822 	struct dlm_user_args *ua;
5823 	int error;
5824 
5825 	dlm_lock_recovery(ls);
5826 
5827 	error = find_lkb(ls, lkid, &lkb);
5828 	if (error)
5829 		goto out;
5830 
5831 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5832 
5833 	/* user can change the params on its lock when it converts it, or
5834 	   add an lvb that didn't exist before */
5835 
5836 	ua = lkb->lkb_ua;
5837 
5838 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840 		if (!ua->lksb.sb_lvbptr) {
5841 			error = -ENOMEM;
5842 			goto out_put;
5843 		}
5844 	}
5845 	if (lvb_in && ua->lksb.sb_lvbptr)
5846 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847 
5848 	ua->xid = ua_tmp->xid;
5849 	ua->castparam = ua_tmp->castparam;
5850 	ua->castaddr = ua_tmp->castaddr;
5851 	ua->bastparam = ua_tmp->bastparam;
5852 	ua->bastaddr = ua_tmp->bastaddr;
5853 	ua->user_lksb = ua_tmp->user_lksb;
5854 
5855 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5856 			      fake_bastfn, &args);
5857 	if (error)
5858 		goto out_put;
5859 
5860 	error = convert_lock(ls, lkb, &args);
5861 
5862 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863 		error = 0;
5864  out_put:
5865 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5866 	dlm_put_lkb(lkb);
5867  out:
5868 	dlm_unlock_recovery(ls);
5869 	kfree(ua_tmp);
5870 	return error;
5871 }
5872 
5873 /*
5874  * The caller asks for an orphan lock on a given resource with a given mode.
5875  * If a matching lock exists, it's moved to the owner's list of locks and
5876  * the lkid is returned.
5877  */
5878 
5879 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5880 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5881 		     uint32_t *lkid)
5882 {
5883 	struct dlm_lkb *lkb = NULL, *iter;
5884 	struct dlm_user_args *ua;
5885 	int found_other_mode = 0;
5886 	int rv = 0;
5887 
5888 	spin_lock_bh(&ls->ls_orphans_lock);
5889 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5890 		if (iter->lkb_resource->res_length != namelen)
5891 			continue;
5892 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5893 			continue;
5894 		if (iter->lkb_grmode != mode) {
5895 			found_other_mode = 1;
5896 			continue;
5897 		}
5898 
5899 		lkb = iter;
5900 		list_del_init(&iter->lkb_ownqueue);
5901 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5902 		*lkid = iter->lkb_id;
5903 		break;
5904 	}
5905 	spin_unlock_bh(&ls->ls_orphans_lock);
5906 
5907 	if (!lkb && found_other_mode) {
5908 		rv = -EAGAIN;
5909 		goto out;
5910 	}
5911 
5912 	if (!lkb) {
5913 		rv = -ENOENT;
5914 		goto out;
5915 	}
5916 
5917 	lkb->lkb_exflags = flags;
5918 	lkb->lkb_ownpid = (int) current->pid;
5919 
5920 	ua = lkb->lkb_ua;
5921 
5922 	ua->proc = ua_tmp->proc;
5923 	ua->xid = ua_tmp->xid;
5924 	ua->castparam = ua_tmp->castparam;
5925 	ua->castaddr = ua_tmp->castaddr;
5926 	ua->bastparam = ua_tmp->bastparam;
5927 	ua->bastaddr = ua_tmp->bastaddr;
5928 	ua->user_lksb = ua_tmp->user_lksb;
5929 
5930 	/*
5931 	 * The lkb reference from the ls_orphans list was not
5932 	 * removed above, and is now considered the reference
5933 	 * for the proc locks list.
5934 	 */
5935 
5936 	spin_lock_bh(&ua->proc->locks_spin);
5937 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5938 	spin_unlock_bh(&ua->proc->locks_spin);
5939  out:
5940 	kfree(ua_tmp);
5941 	return rv;
5942 }
5943 
5944 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5945 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5946 {
5947 	struct dlm_lkb *lkb;
5948 	struct dlm_args args;
5949 	struct dlm_user_args *ua;
5950 	int error;
5951 
5952 	dlm_lock_recovery(ls);
5953 
5954 	error = find_lkb(ls, lkid, &lkb);
5955 	if (error)
5956 		goto out;
5957 
5958 	trace_dlm_unlock_start(ls, lkb, flags);
5959 
5960 	ua = lkb->lkb_ua;
5961 
5962 	if (lvb_in && ua->lksb.sb_lvbptr)
5963 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5964 	if (ua_tmp->castparam)
5965 		ua->castparam = ua_tmp->castparam;
5966 	ua->user_lksb = ua_tmp->user_lksb;
5967 
5968 	error = set_unlock_args(flags, ua, &args);
5969 	if (error)
5970 		goto out_put;
5971 
5972 	error = unlock_lock(ls, lkb, &args);
5973 
5974 	if (error == -DLM_EUNLOCK)
5975 		error = 0;
5976 	/* from validate_unlock_args() */
5977 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5978 		error = 0;
5979 	if (error)
5980 		goto out_put;
5981 
5982 	spin_lock_bh(&ua->proc->locks_spin);
5983 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5984 	if (!list_empty(&lkb->lkb_ownqueue))
5985 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5986 	spin_unlock_bh(&ua->proc->locks_spin);
5987  out_put:
5988 	trace_dlm_unlock_end(ls, lkb, flags, error);
5989 	dlm_put_lkb(lkb);
5990  out:
5991 	dlm_unlock_recovery(ls);
5992 	kfree(ua_tmp);
5993 	return error;
5994 }
5995 
5996 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5997 		    uint32_t flags, uint32_t lkid)
5998 {
5999 	struct dlm_lkb *lkb;
6000 	struct dlm_args args;
6001 	struct dlm_user_args *ua;
6002 	int error;
6003 
6004 	dlm_lock_recovery(ls);
6005 
6006 	error = find_lkb(ls, lkid, &lkb);
6007 	if (error)
6008 		goto out;
6009 
6010 	trace_dlm_unlock_start(ls, lkb, flags);
6011 
6012 	ua = lkb->lkb_ua;
6013 	if (ua_tmp->castparam)
6014 		ua->castparam = ua_tmp->castparam;
6015 	ua->user_lksb = ua_tmp->user_lksb;
6016 
6017 	error = set_unlock_args(flags, ua, &args);
6018 	if (error)
6019 		goto out_put;
6020 
6021 	error = cancel_lock(ls, lkb, &args);
6022 
6023 	if (error == -DLM_ECANCEL)
6024 		error = 0;
6025 	/* from validate_unlock_args() */
6026 	if (error == -EBUSY)
6027 		error = 0;
6028  out_put:
6029 	trace_dlm_unlock_end(ls, lkb, flags, error);
6030 	dlm_put_lkb(lkb);
6031  out:
6032 	dlm_unlock_recovery(ls);
6033 	kfree(ua_tmp);
6034 	return error;
6035 }
6036 
6037 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6038 {
6039 	struct dlm_lkb *lkb;
6040 	struct dlm_args args;
6041 	struct dlm_user_args *ua;
6042 	struct dlm_rsb *r;
6043 	int error;
6044 
6045 	dlm_lock_recovery(ls);
6046 
6047 	error = find_lkb(ls, lkid, &lkb);
6048 	if (error)
6049 		goto out;
6050 
6051 	trace_dlm_unlock_start(ls, lkb, flags);
6052 
6053 	ua = lkb->lkb_ua;
6054 
6055 	error = set_unlock_args(flags, ua, &args);
6056 	if (error)
6057 		goto out_put;
6058 
6059 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6060 
6061 	r = lkb->lkb_resource;
6062 	hold_rsb(r);
6063 	lock_rsb(r);
6064 
6065 	error = validate_unlock_args(lkb, &args);
6066 	if (error)
6067 		goto out_r;
6068 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6069 
6070 	error = _cancel_lock(r, lkb);
6071  out_r:
6072 	unlock_rsb(r);
6073 	put_rsb(r);
6074 
6075 	if (error == -DLM_ECANCEL)
6076 		error = 0;
6077 	/* from validate_unlock_args() */
6078 	if (error == -EBUSY)
6079 		error = 0;
6080  out_put:
6081 	trace_dlm_unlock_end(ls, lkb, flags, error);
6082 	dlm_put_lkb(lkb);
6083  out:
6084 	dlm_unlock_recovery(ls);
6085 	return error;
6086 }
6087 
6088 /* lkb's that are removed from the waiters list by revert are just left on the
6089    orphans list with the granted orphan locks, to be freed by purge */
6090 
6091 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6092 {
6093 	struct dlm_args args;
6094 	int error;
6095 
6096 	hold_lkb(lkb); /* reference for the ls_orphans list */
6097 	spin_lock_bh(&ls->ls_orphans_lock);
6098 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6099 	spin_unlock_bh(&ls->ls_orphans_lock);
6100 
6101 	set_unlock_args(0, lkb->lkb_ua, &args);
6102 
6103 	error = cancel_lock(ls, lkb, &args);
6104 	if (error == -DLM_ECANCEL)
6105 		error = 0;
6106 	return error;
6107 }
6108 
6109 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6110    granted.  Regardless of what rsb queue the lock is on, it's removed and
6111    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6112    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6113 
6114 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115 {
6116 	struct dlm_args args;
6117 	int error;
6118 
6119 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6120 			lkb->lkb_ua, &args);
6121 
6122 	error = unlock_lock(ls, lkb, &args);
6123 	if (error == -DLM_EUNLOCK)
6124 		error = 0;
6125 	return error;
6126 }
6127 
6128 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6129    (which does lock_rsb) due to deadlock with receiving a message that does
6130    lock_rsb followed by dlm_user_add_cb() */
6131 
6132 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6133 				     struct dlm_user_proc *proc)
6134 {
6135 	struct dlm_lkb *lkb = NULL;
6136 
6137 	spin_lock_bh(&ls->ls_clear_proc_locks);
6138 	if (list_empty(&proc->locks))
6139 		goto out;
6140 
6141 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6142 	list_del_init(&lkb->lkb_ownqueue);
6143 
6144 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6145 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6146 	else
6147 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6148  out:
6149 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6150 	return lkb;
6151 }
6152 
6153 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6154    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6155    which we clear here. */
6156 
6157 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6158    list, and no more device_writes should add lkb's to proc->locks list; so we
6159    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6160    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6161    them ourself. */
6162 
6163 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6164 {
6165 	struct dlm_callback *cb, *cb_safe;
6166 	struct dlm_lkb *lkb, *safe;
6167 
6168 	dlm_lock_recovery(ls);
6169 
6170 	while (1) {
6171 		lkb = del_proc_lock(ls, proc);
6172 		if (!lkb)
6173 			break;
6174 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6175 			orphan_proc_lock(ls, lkb);
6176 		else
6177 			unlock_proc_lock(ls, lkb);
6178 
6179 		/* this removes the reference for the proc->locks list
6180 		   added by dlm_user_request, it may result in the lkb
6181 		   being freed */
6182 
6183 		dlm_put_lkb(lkb);
6184 	}
6185 
6186 	spin_lock_bh(&ls->ls_clear_proc_locks);
6187 
6188 	/* in-progress unlocks */
6189 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6190 		list_del_init(&lkb->lkb_ownqueue);
6191 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6192 		dlm_put_lkb(lkb);
6193 	}
6194 
6195 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6196 		list_del(&cb->list);
6197 		dlm_free_cb(cb);
6198 	}
6199 
6200 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6201 	dlm_unlock_recovery(ls);
6202 }
6203 
6204 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6205 {
6206 	struct dlm_callback *cb, *cb_safe;
6207 	struct dlm_lkb *lkb, *safe;
6208 
6209 	while (1) {
6210 		lkb = NULL;
6211 		spin_lock_bh(&proc->locks_spin);
6212 		if (!list_empty(&proc->locks)) {
6213 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6214 					 lkb_ownqueue);
6215 			list_del_init(&lkb->lkb_ownqueue);
6216 		}
6217 		spin_unlock_bh(&proc->locks_spin);
6218 
6219 		if (!lkb)
6220 			break;
6221 
6222 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6223 		unlock_proc_lock(ls, lkb);
6224 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6225 	}
6226 
6227 	spin_lock_bh(&proc->locks_spin);
6228 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6229 		list_del_init(&lkb->lkb_ownqueue);
6230 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6231 		dlm_put_lkb(lkb);
6232 	}
6233 	spin_unlock_bh(&proc->locks_spin);
6234 
6235 	spin_lock_bh(&proc->asts_spin);
6236 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6237 		list_del(&cb->list);
6238 		dlm_free_cb(cb);
6239 	}
6240 	spin_unlock_bh(&proc->asts_spin);
6241 }
6242 
6243 /* pid of 0 means purge all orphans */
6244 
6245 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6246 {
6247 	struct dlm_lkb *lkb, *safe;
6248 
6249 	spin_lock_bh(&ls->ls_orphans_lock);
6250 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6251 		if (pid && lkb->lkb_ownpid != pid)
6252 			continue;
6253 		unlock_proc_lock(ls, lkb);
6254 		list_del_init(&lkb->lkb_ownqueue);
6255 		dlm_put_lkb(lkb);
6256 	}
6257 	spin_unlock_bh(&ls->ls_orphans_lock);
6258 }
6259 
6260 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6261 {
6262 	struct dlm_message *ms;
6263 	struct dlm_mhandle *mh;
6264 	int error;
6265 
6266 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6267 				DLM_MSG_PURGE, &ms, &mh);
6268 	if (error)
6269 		return error;
6270 	ms->m_nodeid = cpu_to_le32(nodeid);
6271 	ms->m_pid = cpu_to_le32(pid);
6272 
6273 	return send_message(mh, ms, NULL, 0);
6274 }
6275 
6276 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6277 		   int nodeid, int pid)
6278 {
6279 	int error = 0;
6280 
6281 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6282 		error = send_purge(ls, nodeid, pid);
6283 	} else {
6284 		dlm_lock_recovery(ls);
6285 		if (pid == current->pid)
6286 			purge_proc_locks(ls, proc);
6287 		else
6288 			do_purge(ls, nodeid, pid);
6289 		dlm_unlock_recovery(ls);
6290 	}
6291 	return error;
6292 }
6293 
6294 /* debug functionality */
6295 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6296 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6297 {
6298 	struct dlm_lksb *lksb;
6299 	struct dlm_lkb *lkb;
6300 	struct dlm_rsb *r;
6301 	int error;
6302 
6303 	/* we currently can't set a valid user lock */
6304 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6305 		return -EOPNOTSUPP;
6306 
6307 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6308 	if (!lksb)
6309 		return -ENOMEM;
6310 
6311 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6312 	if (error) {
6313 		kfree(lksb);
6314 		return error;
6315 	}
6316 
6317 	dlm_set_dflags_val(lkb, lkb_dflags);
6318 	lkb->lkb_nodeid = lkb_nodeid;
6319 	lkb->lkb_lksb = lksb;
6320 	/* user specific pointer, just don't have it NULL for kernel locks */
6321 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6322 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6323 
6324 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6325 	if (error) {
6326 		kfree(lksb);
6327 		__put_lkb(ls, lkb);
6328 		return error;
6329 	}
6330 
6331 	lock_rsb(r);
6332 	attach_lkb(r, lkb);
6333 	add_lkb(r, lkb, lkb_status);
6334 	unlock_rsb(r);
6335 	put_rsb(r);
6336 
6337 	return 0;
6338 }
6339 
6340 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6341 				 int mstype, int to_nodeid)
6342 {
6343 	struct dlm_lkb *lkb;
6344 	int error;
6345 
6346 	error = find_lkb(ls, lkb_id, &lkb);
6347 	if (error)
6348 		return error;
6349 
6350 	add_to_waiters(lkb, mstype, to_nodeid);
6351 	dlm_put_lkb(lkb);
6352 	return 0;
6353 }
6354 
6355