xref: /linux/fs/dlm/lock.c (revision 080e5563f878c64e697b89e7439d730d0daad882)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 	if (len > DLM_RESNAME_MAXLEN)
630 		return -EINVAL;
631 	memcpy(key, name, len);
632 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
633 	if (*r_ret)
634 		return 0;
635 
636 	return -EBADR;
637 }
638 
639 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
640 {
641 	int rv;
642 
643 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
644 				    dlm_rhash_rsb_params);
645 	if (!rv)
646 		rsb_set_flag(rsb, RSB_HASHED);
647 
648 	return rv;
649 }
650 
651 /*
652  * Find rsb in rsbtbl and potentially create/add one
653  *
654  * Delaying the release of rsb's has a similar benefit to applications keeping
655  * NL locks on an rsb, but without the guarantee that the cached master value
656  * will still be valid when the rsb is reused.  Apps aren't always smart enough
657  * to keep NL locks on an rsb that they may lock again shortly; this can lead
658  * to excessive master lookups and removals if we don't delay the release.
659  *
660  * Searching for an rsb means looking through both the normal list and toss
661  * list.  When found on the toss list the rsb is moved to the normal list with
662  * ref count of 1; when found on normal list the ref count is incremented.
663  *
664  * rsb's on the keep list are being used locally and refcounted.
665  * rsb's on the toss list are not being used locally, and are not refcounted.
666  *
667  * The toss list rsb's were either
668  * - previously used locally but not any more (were on keep list, then
669  *   moved to toss list when last refcount dropped)
670  * - created and put on toss list as a directory record for a lookup
671  *   (we are the dir node for the res, but are not using the res right now,
672  *   but some other node is)
673  *
674  * The purpose of find_rsb() is to return a refcounted rsb for local use.
675  * So, if the given rsb is on the toss list, it is moved to the keep list
676  * before being returned.
677  *
678  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
679  * more refcounts exist, so the rsb is moved from the keep list to the
680  * toss list.
681  *
682  * rsb's on both keep and toss lists are used for doing a name to master
683  * lookups.  rsb's that are in use locally (and being refcounted) are on
684  * the keep list, rsb's that are not in use locally (not refcounted) and
685  * only exist for name/master lookups are on the toss list.
686  *
687  * rsb's on the toss list who's dir_nodeid is not local can have stale
688  * name/master mappings.  So, remote requests on such rsb's can potentially
689  * return with an error, which means the mapping is stale and needs to
690  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
691  * first_lkid is to keep only a single outstanding request on an rsb
692  * while that rsb has a potentially stale master.)
693  */
694 
695 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
696 			uint32_t hash, int dir_nodeid, int from_nodeid,
697 			unsigned int flags, struct dlm_rsb **r_ret)
698 {
699 	struct dlm_rsb *r = NULL;
700 	int our_nodeid = dlm_our_nodeid();
701 	int from_local = 0;
702 	int from_other = 0;
703 	int from_dir = 0;
704 	int create = 0;
705 	int error;
706 
707 	if (flags & R_RECEIVE_REQUEST) {
708 		if (from_nodeid == dir_nodeid)
709 			from_dir = 1;
710 		else
711 			from_other = 1;
712 	} else if (flags & R_REQUEST) {
713 		from_local = 1;
714 	}
715 
716 	/*
717 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
718 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
719 	 * we're the new master.  Our local recovery may not have set
720 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
721 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
722 	 * by resending.
723 	 *
724 	 * If someone sends us a request, we are the dir node, and we do
725 	 * not find the rsb anywhere, then recreate it.  This happens if
726 	 * someone sends us a request after we have removed/freed an rsb.
727 	 * (They sent a request instead of lookup because they are using
728 	 * an rsb taken from their scan list.)
729 	 */
730 
731 	if (from_local || from_dir ||
732 	    (from_other && (dir_nodeid == our_nodeid))) {
733 		create = 1;
734 	}
735 
736  retry:
737 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
738 	if (error)
739 		goto do_new;
740 
741 	/* check if the rsb is active under read lock - likely path */
742 	read_lock_bh(&ls->ls_rsbtbl_lock);
743 	if (!rsb_flag(r, RSB_HASHED)) {
744 		read_unlock_bh(&ls->ls_rsbtbl_lock);
745 		error = -EBADR;
746 		goto do_new;
747 	}
748 
749 	/*
750 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
751 	 */
752 
753 	if (rsb_flag(r, RSB_INACTIVE)) {
754 		read_unlock_bh(&ls->ls_rsbtbl_lock);
755 		goto do_inactive;
756 	}
757 
758 	kref_get(&r->res_ref);
759 	read_unlock_bh(&ls->ls_rsbtbl_lock);
760 	goto out;
761 
762 
763  do_inactive:
764 	write_lock_bh(&ls->ls_rsbtbl_lock);
765 
766 	/*
767 	 * The expectation here is that the rsb will have HASHED and
768 	 * INACTIVE flags set, and that the rsb can be moved from
769 	 * inactive back to active again.  However, between releasing
770 	 * the read lock and acquiring the write lock, this rsb could
771 	 * have been removed from rsbtbl, and had HASHED cleared, to
772 	 * be freed.  To deal with this case, we would normally need
773 	 * to repeat dlm_search_rsb_tree while holding the write lock,
774 	 * but rcu allows us to simply check the HASHED flag, because
775 	 * the rcu read lock means the rsb will not be freed yet.
776 	 * If the HASHED flag is not set, then the rsb is being freed,
777 	 * so we add a new rsb struct.  If the HASHED flag is set,
778 	 * and INACTIVE is not set, it means another thread has
779 	 * made the rsb active, as we're expecting to do here, and
780 	 * we just repeat the lookup (this will be very unlikely.)
781 	 */
782 	if (rsb_flag(r, RSB_HASHED)) {
783 		if (!rsb_flag(r, RSB_INACTIVE)) {
784 			write_unlock_bh(&ls->ls_rsbtbl_lock);
785 			goto retry;
786 		}
787 	} else {
788 		write_unlock_bh(&ls->ls_rsbtbl_lock);
789 		error = -EBADR;
790 		goto do_new;
791 	}
792 
793 	/*
794 	 * rsb found inactive (master_nodeid may be out of date unless
795 	 * we are the dir_nodeid or were the master)  No other thread
796 	 * is using this rsb because it's inactive, so we can
797 	 * look at or update res_master_nodeid without lock_rsb.
798 	 */
799 
800 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
801 		/* our rsb was not master, and another node (not the dir node)
802 		   has sent us a request */
803 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
804 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
805 			  r->res_name);
806 		write_unlock_bh(&ls->ls_rsbtbl_lock);
807 		error = -ENOTBLK;
808 		goto out;
809 	}
810 
811 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
812 		/* don't think this should ever happen */
813 		log_error(ls, "find_rsb inactive from_dir %d master %d",
814 			  from_nodeid, r->res_master_nodeid);
815 		dlm_print_rsb(r);
816 		/* fix it and go on */
817 		r->res_master_nodeid = our_nodeid;
818 		r->res_nodeid = 0;
819 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
820 		r->res_first_lkid = 0;
821 	}
822 
823 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
824 		/* Because we have held no locks on this rsb,
825 		   res_master_nodeid could have become stale. */
826 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
827 		r->res_first_lkid = 0;
828 	}
829 
830 	/* we always deactivate scan timer for the rsb, when
831 	 * we move it out of the inactive state as rsb state
832 	 * can be changed and scan timers are only for inactive
833 	 * rsbs.
834 	 */
835 	del_scan(ls, r);
836 	list_move(&r->res_slow_list, &ls->ls_slow_active);
837 	rsb_clear_flag(r, RSB_INACTIVE);
838 	kref_init(&r->res_ref); /* ref is now used in active state */
839 	write_unlock_bh(&ls->ls_rsbtbl_lock);
840 
841 	goto out;
842 
843 
844  do_new:
845 	/*
846 	 * rsb not found
847 	 */
848 
849 	if (error == -EBADR && !create)
850 		goto out;
851 
852 	error = get_rsb_struct(ls, name, len, &r);
853 	if (WARN_ON_ONCE(error))
854 		goto out;
855 
856 	r->res_hash = hash;
857 	r->res_dir_nodeid = dir_nodeid;
858 	kref_init(&r->res_ref);
859 
860 	if (from_dir) {
861 		/* want to see how often this happens */
862 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
863 			  from_nodeid, r->res_name);
864 		r->res_master_nodeid = our_nodeid;
865 		r->res_nodeid = 0;
866 		goto out_add;
867 	}
868 
869 	if (from_other && (dir_nodeid != our_nodeid)) {
870 		/* should never happen */
871 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
872 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
873 		dlm_free_rsb(r);
874 		r = NULL;
875 		error = -ENOTBLK;
876 		goto out;
877 	}
878 
879 	if (from_other) {
880 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
881 			  from_nodeid, dir_nodeid, r->res_name);
882 	}
883 
884 	if (dir_nodeid == our_nodeid) {
885 		/* When we are the dir nodeid, we can set the master
886 		   node immediately */
887 		r->res_master_nodeid = our_nodeid;
888 		r->res_nodeid = 0;
889 	} else {
890 		/* set_master will send_lookup to dir_nodeid */
891 		r->res_master_nodeid = 0;
892 		r->res_nodeid = -1;
893 	}
894 
895  out_add:
896 
897 	write_lock_bh(&ls->ls_rsbtbl_lock);
898 	error = rsb_insert(r, &ls->ls_rsbtbl);
899 	if (error == -EEXIST) {
900 		/* somebody else was faster and it seems the
901 		 * rsb exists now, we do a whole relookup
902 		 */
903 		write_unlock_bh(&ls->ls_rsbtbl_lock);
904 		dlm_free_rsb(r);
905 		goto retry;
906 	} else if (!error) {
907 		list_add(&r->res_slow_list, &ls->ls_slow_active);
908 	}
909 	write_unlock_bh(&ls->ls_rsbtbl_lock);
910  out:
911 	*r_ret = r;
912 	return error;
913 }
914 
915 /* During recovery, other nodes can send us new MSTCPY locks (from
916    dlm_recover_locks) before we've made ourself master (in
917    dlm_recover_masters). */
918 
919 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
920 			  uint32_t hash, int dir_nodeid, int from_nodeid,
921 			  unsigned int flags, struct dlm_rsb **r_ret)
922 {
923 	struct dlm_rsb *r = NULL;
924 	int our_nodeid = dlm_our_nodeid();
925 	int recover = (flags & R_RECEIVE_RECOVER);
926 	int error;
927 
928  retry:
929 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
930 	if (error)
931 		goto do_new;
932 
933 	/* check if the rsb is in active state under read lock - likely path */
934 	read_lock_bh(&ls->ls_rsbtbl_lock);
935 	if (!rsb_flag(r, RSB_HASHED)) {
936 		read_unlock_bh(&ls->ls_rsbtbl_lock);
937 		goto do_new;
938 	}
939 
940 	if (rsb_flag(r, RSB_INACTIVE)) {
941 		read_unlock_bh(&ls->ls_rsbtbl_lock);
942 		goto do_inactive;
943 	}
944 
945 	/*
946 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
947 	 */
948 
949 	kref_get(&r->res_ref);
950 	read_unlock_bh(&ls->ls_rsbtbl_lock);
951 
952 	goto out;
953 
954 
955  do_inactive:
956 	write_lock_bh(&ls->ls_rsbtbl_lock);
957 
958 	/* See comment in find_rsb_dir. */
959 	if (rsb_flag(r, RSB_HASHED)) {
960 		if (!rsb_flag(r, RSB_INACTIVE)) {
961 			write_unlock_bh(&ls->ls_rsbtbl_lock);
962 			goto retry;
963 		}
964 	} else {
965 		write_unlock_bh(&ls->ls_rsbtbl_lock);
966 		goto do_new;
967 	}
968 
969 
970 	/*
971 	 * rsb found inactive. No other thread is using this rsb because
972 	 * it's inactive, so we can look at or update res_master_nodeid
973 	 * without lock_rsb.
974 	 */
975 
976 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
977 		/* our rsb is not master, and another node has sent us a
978 		   request; this should never happen */
979 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
980 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
981 		dlm_print_rsb(r);
982 		write_unlock_bh(&ls->ls_rsbtbl_lock);
983 		error = -ENOTBLK;
984 		goto out;
985 	}
986 
987 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
988 	    (dir_nodeid == our_nodeid)) {
989 		/* our rsb is not master, and we are dir; may as well fix it;
990 		   this should never happen */
991 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
992 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
993 		dlm_print_rsb(r);
994 		r->res_master_nodeid = our_nodeid;
995 		r->res_nodeid = 0;
996 	}
997 
998 	del_scan(ls, r);
999 	list_move(&r->res_slow_list, &ls->ls_slow_active);
1000 	rsb_clear_flag(r, RSB_INACTIVE);
1001 	kref_init(&r->res_ref);
1002 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1003 
1004 	goto out;
1005 
1006 
1007  do_new:
1008 	/*
1009 	 * rsb not found
1010 	 */
1011 
1012 	error = get_rsb_struct(ls, name, len, &r);
1013 	if (WARN_ON_ONCE(error))
1014 		goto out;
1015 
1016 	r->res_hash = hash;
1017 	r->res_dir_nodeid = dir_nodeid;
1018 	r->res_master_nodeid = dir_nodeid;
1019 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1020 	kref_init(&r->res_ref);
1021 
1022 	write_lock_bh(&ls->ls_rsbtbl_lock);
1023 	error = rsb_insert(r, &ls->ls_rsbtbl);
1024 	if (error == -EEXIST) {
1025 		/* somebody else was faster and it seems the
1026 		 * rsb exists now, we do a whole relookup
1027 		 */
1028 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1029 		dlm_free_rsb(r);
1030 		goto retry;
1031 	} else if (!error) {
1032 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1033 	}
1034 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1035 
1036  out:
1037 	*r_ret = r;
1038 	return error;
1039 }
1040 
1041 /*
1042  * rsb rcu usage
1043  *
1044  * While rcu read lock is held, the rsb cannot be freed,
1045  * which allows a lookup optimization.
1046  *
1047  * Two threads are accessing the same rsb concurrently,
1048  * the first (A) is trying to use the rsb, the second (B)
1049  * is trying to free the rsb.
1050  *
1051  * thread A                 thread B
1052  * (trying to use rsb)      (trying to free rsb)
1053  *
1054  * A1. rcu read lock
1055  * A2. rsbtbl read lock
1056  * A3. look up rsb in rsbtbl
1057  * A4. rsbtbl read unlock
1058  *                          B1. rsbtbl write lock
1059  *                          B2. look up rsb in rsbtbl
1060  *                          B3. remove rsb from rsbtbl
1061  *                          B4. clear rsb HASHED flag
1062  *                          B5. rsbtbl write unlock
1063  *                          B6. begin freeing rsb using rcu...
1064  *
1065  * (rsb is inactive, so try to make it active again)
1066  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1067  * A6. the rsb HASHED flag is not set, which it means the rsb
1068  *     is being removed from rsbtbl and freed, so don't use it.
1069  * A7. rcu read unlock
1070  *
1071  *                          B7. ...finish freeing rsb using rcu
1072  * A8. create a new rsb
1073  *
1074  * Without the rcu optimization, steps A5-8 would need to do
1075  * an extra rsbtbl lookup:
1076  * A5. rsbtbl write lock
1077  * A6. look up rsb in rsbtbl, not found
1078  * A7. rsbtbl write unlock
1079  * A8. create a new rsb
1080  */
1081 
1082 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1083 		    int from_nodeid, unsigned int flags,
1084 		    struct dlm_rsb **r_ret)
1085 {
1086 	int dir_nodeid;
1087 	uint32_t hash;
1088 	int rv;
1089 
1090 	if (len > DLM_RESNAME_MAXLEN)
1091 		return -EINVAL;
1092 
1093 	hash = jhash(name, len, 0);
1094 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1095 
1096 	rcu_read_lock();
1097 	if (dlm_no_directory(ls))
1098 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1099 				      from_nodeid, flags, r_ret);
1100 	else
1101 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1102 				    from_nodeid, flags, r_ret);
1103 	rcu_read_unlock();
1104 	return rv;
1105 }
1106 
1107 /* we have received a request and found that res_master_nodeid != our_nodeid,
1108    so we need to return an error or make ourself the master */
1109 
1110 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1111 				  int from_nodeid)
1112 {
1113 	if (dlm_no_directory(ls)) {
1114 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1115 			  from_nodeid, r->res_master_nodeid,
1116 			  r->res_dir_nodeid);
1117 		dlm_print_rsb(r);
1118 		return -ENOTBLK;
1119 	}
1120 
1121 	if (from_nodeid != r->res_dir_nodeid) {
1122 		/* our rsb is not master, and another node (not the dir node)
1123 	   	   has sent us a request.  this is much more common when our
1124 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1125 
1126 		if (r->res_master_nodeid) {
1127 			log_debug(ls, "validate master from_other %d master %d "
1128 				  "dir %d first %x %s", from_nodeid,
1129 				  r->res_master_nodeid, r->res_dir_nodeid,
1130 				  r->res_first_lkid, r->res_name);
1131 		}
1132 		return -ENOTBLK;
1133 	} else {
1134 		/* our rsb is not master, but the dir nodeid has sent us a
1135 	   	   request; this could happen with master 0 / res_nodeid -1 */
1136 
1137 		if (r->res_master_nodeid) {
1138 			log_error(ls, "validate master from_dir %d master %d "
1139 				  "first %x %s",
1140 				  from_nodeid, r->res_master_nodeid,
1141 				  r->res_first_lkid, r->res_name);
1142 		}
1143 
1144 		r->res_master_nodeid = dlm_our_nodeid();
1145 		r->res_nodeid = 0;
1146 		return 0;
1147 	}
1148 }
1149 
1150 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1151 				int from_nodeid, bool is_inactive, unsigned int flags,
1152 				int *r_nodeid, int *result)
1153 {
1154 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1155 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1156 
1157 	if (r->res_dir_nodeid != our_nodeid) {
1158 		/* should not happen, but may as well fix it and carry on */
1159 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1160 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1161 		r->res_dir_nodeid = our_nodeid;
1162 	}
1163 
1164 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1165 		/* Recovery uses this function to set a new master when
1166 		 * the previous master failed.  Setting NEW_MASTER will
1167 		 * force dlm_recover_masters to call recover_master on this
1168 		 * rsb even though the res_nodeid is no longer removed.
1169 		 */
1170 
1171 		r->res_master_nodeid = from_nodeid;
1172 		r->res_nodeid = from_nodeid;
1173 		rsb_set_flag(r, RSB_NEW_MASTER);
1174 
1175 		if (is_inactive) {
1176 			/* I don't think we should ever find it inactive. */
1177 			log_error(ls, "%s fix_master inactive", __func__);
1178 			dlm_dump_rsb(r);
1179 		}
1180 	}
1181 
1182 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1183 		/* this will happen if from_nodeid became master during
1184 		 * a previous recovery cycle, and we aborted the previous
1185 		 * cycle before recovering this master value
1186 		 */
1187 
1188 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1189 			  __func__, from_nodeid, r->res_master_nodeid,
1190 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1191 
1192 		if (r->res_master_nodeid == our_nodeid) {
1193 			log_error(ls, "from_master %d our_master", from_nodeid);
1194 			dlm_dump_rsb(r);
1195 			goto ret_assign;
1196 		}
1197 
1198 		r->res_master_nodeid = from_nodeid;
1199 		r->res_nodeid = from_nodeid;
1200 		rsb_set_flag(r, RSB_NEW_MASTER);
1201 	}
1202 
1203 	if (!r->res_master_nodeid) {
1204 		/* this will happen if recovery happens while we're looking
1205 		 * up the master for this rsb
1206 		 */
1207 
1208 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1209 			  from_nodeid, r->res_first_lkid, r->res_name);
1210 		r->res_master_nodeid = from_nodeid;
1211 		r->res_nodeid = from_nodeid;
1212 	}
1213 
1214 	if (!from_master && !fix_master &&
1215 	    (r->res_master_nodeid == from_nodeid)) {
1216 		/* this can happen when the master sends remove, the dir node
1217 		 * finds the rsb on the active list and ignores the remove,
1218 		 * and the former master sends a lookup
1219 		 */
1220 
1221 		log_limit(ls, "%s from master %d flags %x first %x %s",
1222 			  __func__, from_nodeid, flags, r->res_first_lkid,
1223 			  r->res_name);
1224 	}
1225 
1226  ret_assign:
1227 	*r_nodeid = r->res_master_nodeid;
1228 	if (result)
1229 		*result = DLM_LU_MATCH;
1230 }
1231 
1232 /*
1233  * We're the dir node for this res and another node wants to know the
1234  * master nodeid.  During normal operation (non recovery) this is only
1235  * called from receive_lookup(); master lookups when the local node is
1236  * the dir node are done by find_rsb().
1237  *
1238  * normal operation, we are the dir node for a resource
1239  * . _request_lock
1240  * . set_master
1241  * . send_lookup
1242  * . receive_lookup
1243  * . dlm_master_lookup flags 0
1244  *
1245  * recover directory, we are rebuilding dir for all resources
1246  * . dlm_recover_directory
1247  * . dlm_rcom_names
1248  *   remote node sends back the rsb names it is master of and we are dir of
1249  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1250  *   we either create new rsb setting remote node as master, or find existing
1251  *   rsb and set master to be the remote node.
1252  *
1253  * recover masters, we are finding the new master for resources
1254  * . dlm_recover_masters
1255  * . recover_master
1256  * . dlm_send_rcom_lookup
1257  * . receive_rcom_lookup
1258  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1259  */
1260 
1261 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1262 			      int len, unsigned int flags, int *r_nodeid, int *result)
1263 {
1264 	struct dlm_rsb *r = NULL;
1265 	uint32_t hash;
1266 	int our_nodeid = dlm_our_nodeid();
1267 	int dir_nodeid, error;
1268 
1269 	if (len > DLM_RESNAME_MAXLEN)
1270 		return -EINVAL;
1271 
1272 	if (from_nodeid == our_nodeid) {
1273 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1274 			  our_nodeid, flags);
1275 		return -EINVAL;
1276 	}
1277 
1278 	hash = jhash(name, len, 0);
1279 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1280 	if (dir_nodeid != our_nodeid) {
1281 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1282 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1283 			  ls->ls_num_nodes);
1284 		*r_nodeid = -1;
1285 		return -EINVAL;
1286 	}
1287 
1288  retry:
1289 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1290 	if (error)
1291 		goto not_found;
1292 
1293 	/* check if the rsb is active under read lock - likely path */
1294 	read_lock_bh(&ls->ls_rsbtbl_lock);
1295 	if (!rsb_flag(r, RSB_HASHED)) {
1296 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1297 		goto not_found;
1298 	}
1299 
1300 	if (rsb_flag(r, RSB_INACTIVE)) {
1301 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1302 		goto do_inactive;
1303 	}
1304 
1305 	/* because the rsb is active, we need to lock_rsb before
1306 	 * checking/changing re_master_nodeid
1307 	 */
1308 
1309 	hold_rsb(r);
1310 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1311 	lock_rsb(r);
1312 
1313 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1314 			    flags, r_nodeid, result);
1315 
1316 	/* the rsb was active */
1317 	unlock_rsb(r);
1318 	put_rsb(r);
1319 
1320 	return 0;
1321 
1322  do_inactive:
1323 	/* unlikely path - check if still part of ls_rsbtbl */
1324 	write_lock_bh(&ls->ls_rsbtbl_lock);
1325 
1326 	/* see comment in find_rsb_dir */
1327 	if (rsb_flag(r, RSB_HASHED)) {
1328 		if (!rsb_flag(r, RSB_INACTIVE)) {
1329 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1330 			/* something as changed, very unlikely but
1331 			 * try again
1332 			 */
1333 			goto retry;
1334 		}
1335 	} else {
1336 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1337 		goto not_found;
1338 	}
1339 
1340 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1341 	   is not used, but is protected by the rsbtbl lock */
1342 
1343 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1344 			    r_nodeid, result);
1345 
1346 	/* A dir record rsb should never be on scan list.
1347 	 * Except when we are the dir and master node.
1348 	 * This function should only be called by the dir
1349 	 * node.
1350 	 */
1351 	WARN_ON(!list_empty(&r->res_scan_list) &&
1352 		r->res_master_nodeid != our_nodeid);
1353 
1354 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1355 
1356 	return 0;
1357 
1358  not_found:
1359 	error = get_rsb_struct(ls, name, len, &r);
1360 	if (WARN_ON_ONCE(error))
1361 		goto out;
1362 
1363 	r->res_hash = hash;
1364 	r->res_dir_nodeid = our_nodeid;
1365 	r->res_master_nodeid = from_nodeid;
1366 	r->res_nodeid = from_nodeid;
1367 	rsb_set_flag(r, RSB_INACTIVE);
1368 
1369 	write_lock_bh(&ls->ls_rsbtbl_lock);
1370 	error = rsb_insert(r, &ls->ls_rsbtbl);
1371 	if (error == -EEXIST) {
1372 		/* somebody else was faster and it seems the
1373 		 * rsb exists now, we do a whole relookup
1374 		 */
1375 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1376 		dlm_free_rsb(r);
1377 		goto retry;
1378 	} else if (error) {
1379 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1380 		/* should never happen */
1381 		dlm_free_rsb(r);
1382 		goto retry;
1383 	}
1384 
1385 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1386 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1387 
1388 	if (result)
1389 		*result = DLM_LU_ADD;
1390 	*r_nodeid = from_nodeid;
1391  out:
1392 	return error;
1393 }
1394 
1395 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1396 		      int len, unsigned int flags, int *r_nodeid, int *result)
1397 {
1398 	int rv;
1399 	rcu_read_lock();
1400 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1401 	rcu_read_unlock();
1402 	return rv;
1403 }
1404 
1405 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1406 {
1407 	struct dlm_rsb *r;
1408 
1409 	read_lock_bh(&ls->ls_rsbtbl_lock);
1410 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1411 		if (r->res_hash == hash)
1412 			dlm_dump_rsb(r);
1413 	}
1414 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1415 }
1416 
1417 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1418 {
1419 	struct dlm_rsb *r = NULL;
1420 	int error;
1421 
1422 	rcu_read_lock();
1423 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1424 	if (!error)
1425 		goto out;
1426 
1427 	dlm_dump_rsb(r);
1428  out:
1429 	rcu_read_unlock();
1430 }
1431 
1432 static void deactivate_rsb(struct kref *kref)
1433 {
1434 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1435 	struct dlm_ls *ls = r->res_ls;
1436 	int our_nodeid = dlm_our_nodeid();
1437 
1438 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1439 	rsb_set_flag(r, RSB_INACTIVE);
1440 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1441 
1442 	/*
1443 	 * When the rsb becomes unused, there are two possibilities:
1444 	 * 1. Leave the inactive rsb in place (don't remove it).
1445 	 * 2. Add it to the scan list to be removed.
1446 	 *
1447 	 * 1 is done when the rsb is acting as the dir record
1448 	 * for a remotely mastered rsb.  The rsb must be left
1449 	 * in place as an inactive rsb to act as the dir record.
1450 	 *
1451 	 * 2 is done when a) the rsb is not the master and not the
1452 	 * dir record, b) when the rsb is both the master and the
1453 	 * dir record, c) when the rsb is master but not dir record.
1454 	 *
1455 	 * (If no directory is used, the rsb can always be removed.)
1456 	 */
1457 	if (dlm_no_directory(ls) ||
1458 	    (r->res_master_nodeid == our_nodeid ||
1459 	     dlm_dir_nodeid(r) != our_nodeid))
1460 		add_scan(ls, r);
1461 
1462 	if (r->res_lvbptr) {
1463 		dlm_free_lvb(r->res_lvbptr);
1464 		r->res_lvbptr = NULL;
1465 	}
1466 }
1467 
1468 void free_inactive_rsb(struct dlm_rsb *r)
1469 {
1470 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1471 
1472 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1473 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1474 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1475 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1476 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1477 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1478 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1479 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1480 
1481 	dlm_free_rsb(r);
1482 }
1483 
1484 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1485    The rsb must exist as long as any lkb's for it do. */
1486 
1487 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1488 {
1489 	hold_rsb(r);
1490 	lkb->lkb_resource = r;
1491 }
1492 
1493 static void detach_lkb(struct dlm_lkb *lkb)
1494 {
1495 	if (lkb->lkb_resource) {
1496 		put_rsb(lkb->lkb_resource);
1497 		lkb->lkb_resource = NULL;
1498 	}
1499 }
1500 
1501 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1502 		       unsigned long start, unsigned long end)
1503 {
1504 	struct xa_limit limit;
1505 	struct dlm_lkb *lkb;
1506 	int rv;
1507 
1508 	limit.max = end;
1509 	limit.min = start;
1510 
1511 	lkb = dlm_allocate_lkb();
1512 	if (!lkb)
1513 		return -ENOMEM;
1514 
1515 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1516 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1517 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1518 	lkb->lkb_nodeid = -1;
1519 	lkb->lkb_grmode = DLM_LOCK_IV;
1520 	kref_init(&lkb->lkb_ref);
1521 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1522 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1523 
1524 	write_lock_bh(&ls->ls_lkbxa_lock);
1525 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1526 	write_unlock_bh(&ls->ls_lkbxa_lock);
1527 
1528 	if (rv < 0) {
1529 		log_error(ls, "create_lkb xa error %d", rv);
1530 		dlm_free_lkb(lkb);
1531 		return rv;
1532 	}
1533 
1534 	*lkb_ret = lkb;
1535 	return 0;
1536 }
1537 
1538 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1539 {
1540 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1541 }
1542 
1543 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1544 {
1545 	struct dlm_lkb *lkb;
1546 
1547 	rcu_read_lock();
1548 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1549 	if (lkb) {
1550 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1551 		 * the lkb_ref is tight to the lkbxa data structure, see
1552 		 * __put_lkb().
1553 		 */
1554 		read_lock_bh(&ls->ls_lkbxa_lock);
1555 		if (kref_read(&lkb->lkb_ref))
1556 			kref_get(&lkb->lkb_ref);
1557 		else
1558 			lkb = NULL;
1559 		read_unlock_bh(&ls->ls_lkbxa_lock);
1560 	}
1561 	rcu_read_unlock();
1562 
1563 	*lkb_ret = lkb;
1564 	return lkb ? 0 : -ENOENT;
1565 }
1566 
1567 static void kill_lkb(struct kref *kref)
1568 {
1569 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1570 
1571 	/* All work is done after the return from kref_put() so we
1572 	   can release the write_lock before the detach_lkb */
1573 
1574 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1575 }
1576 
1577 /* __put_lkb() is used when an lkb may not have an rsb attached to
1578    it so we need to provide the lockspace explicitly */
1579 
1580 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1581 {
1582 	uint32_t lkid = lkb->lkb_id;
1583 	int rv;
1584 
1585 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1586 					&ls->ls_lkbxa_lock);
1587 	if (rv) {
1588 		xa_erase(&ls->ls_lkbxa, lkid);
1589 		write_unlock_bh(&ls->ls_lkbxa_lock);
1590 
1591 		detach_lkb(lkb);
1592 
1593 		/* for local/process lkbs, lvbptr points to caller's lksb */
1594 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1595 			dlm_free_lvb(lkb->lkb_lvbptr);
1596 		dlm_free_lkb(lkb);
1597 	}
1598 
1599 	return rv;
1600 }
1601 
1602 int dlm_put_lkb(struct dlm_lkb *lkb)
1603 {
1604 	struct dlm_ls *ls;
1605 
1606 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1607 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1608 
1609 	ls = lkb->lkb_resource->res_ls;
1610 	return __put_lkb(ls, lkb);
1611 }
1612 
1613 /* This is only called to add a reference when the code already holds
1614    a valid reference to the lkb, so there's no need for locking. */
1615 
1616 static inline void hold_lkb(struct dlm_lkb *lkb)
1617 {
1618 	kref_get(&lkb->lkb_ref);
1619 }
1620 
1621 static void unhold_lkb_assert(struct kref *kref)
1622 {
1623 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1624 
1625 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1626 }
1627 
1628 /* This is called when we need to remove a reference and are certain
1629    it's not the last ref.  e.g. del_lkb is always called between a
1630    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1631    put_lkb would work fine, but would involve unnecessary locking */
1632 
1633 static inline void unhold_lkb(struct dlm_lkb *lkb)
1634 {
1635 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1636 }
1637 
1638 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1639 			    int mode)
1640 {
1641 	struct dlm_lkb *lkb = NULL, *iter;
1642 
1643 	list_for_each_entry(iter, head, lkb_statequeue)
1644 		if (iter->lkb_rqmode < mode) {
1645 			lkb = iter;
1646 			list_add_tail(new, &iter->lkb_statequeue);
1647 			break;
1648 		}
1649 
1650 	if (!lkb)
1651 		list_add_tail(new, head);
1652 }
1653 
1654 /* add/remove lkb to rsb's grant/convert/wait queue */
1655 
1656 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1657 {
1658 	kref_get(&lkb->lkb_ref);
1659 
1660 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1661 
1662 	lkb->lkb_timestamp = ktime_get();
1663 
1664 	lkb->lkb_status = status;
1665 
1666 	switch (status) {
1667 	case DLM_LKSTS_WAITING:
1668 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1669 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1670 		else
1671 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1672 		break;
1673 	case DLM_LKSTS_GRANTED:
1674 		/* convention says granted locks kept in order of grmode */
1675 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1676 				lkb->lkb_grmode);
1677 		break;
1678 	case DLM_LKSTS_CONVERT:
1679 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1680 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1681 		else
1682 			list_add_tail(&lkb->lkb_statequeue,
1683 				      &r->res_convertqueue);
1684 		break;
1685 	default:
1686 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1687 	}
1688 }
1689 
1690 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691 {
1692 	lkb->lkb_status = 0;
1693 	list_del(&lkb->lkb_statequeue);
1694 	unhold_lkb(lkb);
1695 }
1696 
1697 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1698 {
1699 	del_lkb(r, lkb);
1700 	add_lkb(r, lkb, sts);
1701 }
1702 
1703 static int msg_reply_type(int mstype)
1704 {
1705 	switch (mstype) {
1706 	case DLM_MSG_REQUEST:
1707 		return DLM_MSG_REQUEST_REPLY;
1708 	case DLM_MSG_CONVERT:
1709 		return DLM_MSG_CONVERT_REPLY;
1710 	case DLM_MSG_UNLOCK:
1711 		return DLM_MSG_UNLOCK_REPLY;
1712 	case DLM_MSG_CANCEL:
1713 		return DLM_MSG_CANCEL_REPLY;
1714 	case DLM_MSG_LOOKUP:
1715 		return DLM_MSG_LOOKUP_REPLY;
1716 	}
1717 	return -1;
1718 }
1719 
1720 /* add/remove lkb from global waiters list of lkb's waiting for
1721    a reply from a remote node */
1722 
1723 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1724 {
1725 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1726 
1727 	spin_lock_bh(&ls->ls_waiters_lock);
1728 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1729 		switch (mstype) {
1730 		case DLM_MSG_UNLOCK:
1731 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1732 			break;
1733 		case DLM_MSG_CANCEL:
1734 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1735 			break;
1736 		default:
1737 			/* should never happen as validate_lock_args() checks
1738 			 * on lkb_wait_type and validate_unlock_args() only
1739 			 * creates UNLOCK or CANCEL messages.
1740 			 */
1741 			WARN_ON_ONCE(1);
1742 			goto out;
1743 		}
1744 		lkb->lkb_wait_count++;
1745 		hold_lkb(lkb);
1746 
1747 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1748 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1749 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1750 		goto out;
1751 	}
1752 
1753 	DLM_ASSERT(!lkb->lkb_wait_count,
1754 		   dlm_print_lkb(lkb);
1755 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1756 
1757 	lkb->lkb_wait_count++;
1758 	lkb->lkb_wait_type = mstype;
1759 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1760 	hold_lkb(lkb);
1761 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1762  out:
1763 	spin_unlock_bh(&ls->ls_waiters_lock);
1764 }
1765 
1766 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1767    list as part of process_requestqueue (e.g. a lookup that has an optimized
1768    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1769    set RESEND and dlm_recover_waiters_post() */
1770 
1771 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1772 				const struct dlm_message *ms)
1773 {
1774 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1775 	int overlap_done = 0;
1776 
1777 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1778 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1779 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1780 		overlap_done = 1;
1781 		goto out_del;
1782 	}
1783 
1784 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1785 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1786 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1787 		overlap_done = 1;
1788 		goto out_del;
1789 	}
1790 
1791 	/* Cancel state was preemptively cleared by a successful convert,
1792 	   see next comment, nothing to do. */
1793 
1794 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1795 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1796 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1797 			  lkb->lkb_id, lkb->lkb_wait_type);
1798 		return -1;
1799 	}
1800 
1801 	/* Remove for the convert reply, and premptively remove for the
1802 	   cancel reply.  A convert has been granted while there's still
1803 	   an outstanding cancel on it (the cancel is moot and the result
1804 	   in the cancel reply should be 0).  We preempt the cancel reply
1805 	   because the app gets the convert result and then can follow up
1806 	   with another op, like convert.  This subsequent op would see the
1807 	   lingering state of the cancel and fail with -EBUSY. */
1808 
1809 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1810 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1811 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1812 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1813 			  lkb->lkb_id);
1814 		lkb->lkb_wait_type = 0;
1815 		lkb->lkb_wait_count--;
1816 		unhold_lkb(lkb);
1817 		goto out_del;
1818 	}
1819 
1820 	/* N.B. type of reply may not always correspond to type of original
1821 	   msg due to lookup->request optimization, verify others? */
1822 
1823 	if (lkb->lkb_wait_type) {
1824 		lkb->lkb_wait_type = 0;
1825 		goto out_del;
1826 	}
1827 
1828 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1829 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1830 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1831 	return -1;
1832 
1833  out_del:
1834 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1835 	   to the op that was in progress prior to the unlock/cancel; we
1836 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1837 	   this would happen */
1838 
1839 	if (overlap_done && lkb->lkb_wait_type) {
1840 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1841 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1842 		lkb->lkb_wait_count--;
1843 		unhold_lkb(lkb);
1844 		lkb->lkb_wait_type = 0;
1845 	}
1846 
1847 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1848 
1849 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1850 	lkb->lkb_wait_count--;
1851 	if (!lkb->lkb_wait_count)
1852 		list_del_init(&lkb->lkb_wait_reply);
1853 	unhold_lkb(lkb);
1854 	return 0;
1855 }
1856 
1857 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1858 {
1859 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1860 	int error;
1861 
1862 	spin_lock_bh(&ls->ls_waiters_lock);
1863 	error = _remove_from_waiters(lkb, mstype, NULL);
1864 	spin_unlock_bh(&ls->ls_waiters_lock);
1865 	return error;
1866 }
1867 
1868 /* Handles situations where we might be processing a "fake" or "local" reply in
1869  * the recovery context which stops any locking activity. Only debugfs might
1870  * change the lockspace waiters but they will held the recovery lock to ensure
1871  * remove_from_waiters_ms() in local case will be the only user manipulating the
1872  * lockspace waiters in recovery context.
1873  */
1874 
1875 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1876 				  const struct dlm_message *ms, bool local)
1877 {
1878 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1879 	int error;
1880 
1881 	if (!local)
1882 		spin_lock_bh(&ls->ls_waiters_lock);
1883 	else
1884 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1885 			     !dlm_locking_stopped(ls));
1886 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1887 	if (!local)
1888 		spin_unlock_bh(&ls->ls_waiters_lock);
1889 	return error;
1890 }
1891 
1892 /* lkb is master or local copy */
1893 
1894 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1895 {
1896 	int b, len = r->res_ls->ls_lvblen;
1897 
1898 	/* b=1 lvb returned to caller
1899 	   b=0 lvb written to rsb or invalidated
1900 	   b=-1 do nothing */
1901 
1902 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1903 
1904 	if (b == 1) {
1905 		if (!lkb->lkb_lvbptr)
1906 			return;
1907 
1908 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1909 			return;
1910 
1911 		if (!r->res_lvbptr)
1912 			return;
1913 
1914 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1915 		lkb->lkb_lvbseq = r->res_lvbseq;
1916 
1917 	} else if (b == 0) {
1918 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1919 			rsb_set_flag(r, RSB_VALNOTVALID);
1920 			return;
1921 		}
1922 
1923 		if (!lkb->lkb_lvbptr)
1924 			return;
1925 
1926 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1927 			return;
1928 
1929 		if (!r->res_lvbptr)
1930 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1931 
1932 		if (!r->res_lvbptr)
1933 			return;
1934 
1935 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1936 		r->res_lvbseq++;
1937 		lkb->lkb_lvbseq = r->res_lvbseq;
1938 		rsb_clear_flag(r, RSB_VALNOTVALID);
1939 	}
1940 
1941 	if (rsb_flag(r, RSB_VALNOTVALID))
1942 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1943 }
1944 
1945 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1946 {
1947 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1948 		return;
1949 
1950 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1951 		rsb_set_flag(r, RSB_VALNOTVALID);
1952 		return;
1953 	}
1954 
1955 	if (!lkb->lkb_lvbptr)
1956 		return;
1957 
1958 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1959 		return;
1960 
1961 	if (!r->res_lvbptr)
1962 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1963 
1964 	if (!r->res_lvbptr)
1965 		return;
1966 
1967 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1968 	r->res_lvbseq++;
1969 	rsb_clear_flag(r, RSB_VALNOTVALID);
1970 }
1971 
1972 /* lkb is process copy (pc) */
1973 
1974 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1975 			    const struct dlm_message *ms)
1976 {
1977 	int b;
1978 
1979 	if (!lkb->lkb_lvbptr)
1980 		return;
1981 
1982 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1983 		return;
1984 
1985 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1986 	if (b == 1) {
1987 		int len = receive_extralen(ms);
1988 		if (len > r->res_ls->ls_lvblen)
1989 			len = r->res_ls->ls_lvblen;
1990 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1991 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1992 	}
1993 }
1994 
1995 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1996    remove_lock -- used for unlock, removes lkb from granted
1997    revert_lock -- used for cancel, moves lkb from convert to granted
1998    grant_lock  -- used for request and convert, adds lkb to granted or
1999                   moves lkb from convert or waiting to granted
2000 
2001    Each of these is used for master or local copy lkb's.  There is
2002    also a _pc() variation used to make the corresponding change on
2003    a process copy (pc) lkb. */
2004 
2005 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2006 {
2007 	del_lkb(r, lkb);
2008 	lkb->lkb_grmode = DLM_LOCK_IV;
2009 	/* this unhold undoes the original ref from create_lkb()
2010 	   so this leads to the lkb being freed */
2011 	unhold_lkb(lkb);
2012 }
2013 
2014 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2015 {
2016 	set_lvb_unlock(r, lkb);
2017 	_remove_lock(r, lkb);
2018 }
2019 
2020 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2021 {
2022 	_remove_lock(r, lkb);
2023 }
2024 
2025 /* returns: 0 did nothing
2026 	    1 moved lock to granted
2027 	   -1 removed lock */
2028 
2029 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2030 {
2031 	int rv = 0;
2032 
2033 	lkb->lkb_rqmode = DLM_LOCK_IV;
2034 
2035 	switch (lkb->lkb_status) {
2036 	case DLM_LKSTS_GRANTED:
2037 		break;
2038 	case DLM_LKSTS_CONVERT:
2039 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2040 		rv = 1;
2041 		break;
2042 	case DLM_LKSTS_WAITING:
2043 		del_lkb(r, lkb);
2044 		lkb->lkb_grmode = DLM_LOCK_IV;
2045 		/* this unhold undoes the original ref from create_lkb()
2046 		   so this leads to the lkb being freed */
2047 		unhold_lkb(lkb);
2048 		rv = -1;
2049 		break;
2050 	default:
2051 		log_print("invalid status for revert %d", lkb->lkb_status);
2052 	}
2053 	return rv;
2054 }
2055 
2056 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 {
2058 	return revert_lock(r, lkb);
2059 }
2060 
2061 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2062 {
2063 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2064 		lkb->lkb_grmode = lkb->lkb_rqmode;
2065 		if (lkb->lkb_status)
2066 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2067 		else
2068 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2069 	}
2070 
2071 	lkb->lkb_rqmode = DLM_LOCK_IV;
2072 	lkb->lkb_highbast = 0;
2073 }
2074 
2075 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077 	set_lvb_lock(r, lkb);
2078 	_grant_lock(r, lkb);
2079 }
2080 
2081 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2082 			  const struct dlm_message *ms)
2083 {
2084 	set_lvb_lock_pc(r, lkb, ms);
2085 	_grant_lock(r, lkb);
2086 }
2087 
2088 /* called by grant_pending_locks() which means an async grant message must
2089    be sent to the requesting node in addition to granting the lock if the
2090    lkb belongs to a remote node. */
2091 
2092 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2093 {
2094 	grant_lock(r, lkb);
2095 	if (is_master_copy(lkb))
2096 		send_grant(r, lkb);
2097 	else
2098 		queue_cast(r, lkb, 0);
2099 }
2100 
2101 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2102    change the granted/requested modes.  We're munging things accordingly in
2103    the process copy.
2104    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2105    conversion deadlock
2106    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2107    compatible with other granted locks */
2108 
2109 static void munge_demoted(struct dlm_lkb *lkb)
2110 {
2111 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2112 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2113 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2114 		return;
2115 	}
2116 
2117 	lkb->lkb_grmode = DLM_LOCK_NL;
2118 }
2119 
2120 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2121 {
2122 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2123 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2124 		log_print("munge_altmode %x invalid reply type %d",
2125 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2126 		return;
2127 	}
2128 
2129 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2130 		lkb->lkb_rqmode = DLM_LOCK_PR;
2131 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2132 		lkb->lkb_rqmode = DLM_LOCK_CW;
2133 	else {
2134 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2135 		dlm_print_lkb(lkb);
2136 	}
2137 }
2138 
2139 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2140 {
2141 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2142 					   lkb_statequeue);
2143 	if (lkb->lkb_id == first->lkb_id)
2144 		return 1;
2145 
2146 	return 0;
2147 }
2148 
2149 /* Check if the given lkb conflicts with another lkb on the queue. */
2150 
2151 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2152 {
2153 	struct dlm_lkb *this;
2154 
2155 	list_for_each_entry(this, head, lkb_statequeue) {
2156 		if (this == lkb)
2157 			continue;
2158 		if (!modes_compat(this, lkb))
2159 			return 1;
2160 	}
2161 	return 0;
2162 }
2163 
2164 /*
2165  * "A conversion deadlock arises with a pair of lock requests in the converting
2166  * queue for one resource.  The granted mode of each lock blocks the requested
2167  * mode of the other lock."
2168  *
2169  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2170  * convert queue from being granted, then deadlk/demote lkb.
2171  *
2172  * Example:
2173  * Granted Queue: empty
2174  * Convert Queue: NL->EX (first lock)
2175  *                PR->EX (second lock)
2176  *
2177  * The first lock can't be granted because of the granted mode of the second
2178  * lock and the second lock can't be granted because it's not first in the
2179  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2180  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2181  * flag set and return DEMOTED in the lksb flags.
2182  *
2183  * Originally, this function detected conv-deadlk in a more limited scope:
2184  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2185  * - if lkb1 was the first entry in the queue (not just earlier), and was
2186  *   blocked by the granted mode of lkb2, and there was nothing on the
2187  *   granted queue preventing lkb1 from being granted immediately, i.e.
2188  *   lkb2 was the only thing preventing lkb1 from being granted.
2189  *
2190  * That second condition meant we'd only say there was conv-deadlk if
2191  * resolving it (by demotion) would lead to the first lock on the convert
2192  * queue being granted right away.  It allowed conversion deadlocks to exist
2193  * between locks on the convert queue while they couldn't be granted anyway.
2194  *
2195  * Now, we detect and take action on conversion deadlocks immediately when
2196  * they're created, even if they may not be immediately consequential.  If
2197  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2198  * mode that would prevent lkb1's conversion from being granted, we do a
2199  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2200  * I think this means that the lkb_is_ahead condition below should always
2201  * be zero, i.e. there will never be conv-deadlk between two locks that are
2202  * both already on the convert queue.
2203  */
2204 
2205 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2206 {
2207 	struct dlm_lkb *lkb1;
2208 	int lkb_is_ahead = 0;
2209 
2210 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2211 		if (lkb1 == lkb2) {
2212 			lkb_is_ahead = 1;
2213 			continue;
2214 		}
2215 
2216 		if (!lkb_is_ahead) {
2217 			if (!modes_compat(lkb2, lkb1))
2218 				return 1;
2219 		} else {
2220 			if (!modes_compat(lkb2, lkb1) &&
2221 			    !modes_compat(lkb1, lkb2))
2222 				return 1;
2223 		}
2224 	}
2225 	return 0;
2226 }
2227 
2228 /*
2229  * Return 1 if the lock can be granted, 0 otherwise.
2230  * Also detect and resolve conversion deadlocks.
2231  *
2232  * lkb is the lock to be granted
2233  *
2234  * now is 1 if the function is being called in the context of the
2235  * immediate request, it is 0 if called later, after the lock has been
2236  * queued.
2237  *
2238  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2239  * after recovery.
2240  *
2241  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2242  */
2243 
2244 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2245 			   int recover)
2246 {
2247 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2248 
2249 	/*
2250 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2251 	 * a new request for a NL mode lock being blocked.
2252 	 *
2253 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2254 	 * request, then it would be granted.  In essence, the use of this flag
2255 	 * tells the Lock Manager to expedite theis request by not considering
2256 	 * what may be in the CONVERTING or WAITING queues...  As of this
2257 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2258 	 * mode locks.  This flag is not valid for conversion requests.
2259 	 *
2260 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2261 	 * conversion or used with a non-NL requested mode.  We also know an
2262 	 * EXPEDITE request is always granted immediately, so now must always
2263 	 * be 1.  The full condition to grant an expedite request: (now &&
2264 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2265 	 * therefore be shortened to just checking the flag.
2266 	 */
2267 
2268 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2269 		return 1;
2270 
2271 	/*
2272 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2273 	 * added to the remaining conditions.
2274 	 */
2275 
2276 	if (queue_conflict(&r->res_grantqueue, lkb))
2277 		return 0;
2278 
2279 	/*
2280 	 * 6-3: By default, a conversion request is immediately granted if the
2281 	 * requested mode is compatible with the modes of all other granted
2282 	 * locks
2283 	 */
2284 
2285 	if (queue_conflict(&r->res_convertqueue, lkb))
2286 		return 0;
2287 
2288 	/*
2289 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2290 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2291 	 * The lkb's may have been rebuilt on the queues in a different
2292 	 * order than they were in on the previous master.  So, granting
2293 	 * queued conversions in order after recovery doesn't make sense
2294 	 * since the order hasn't been preserved anyway.  The new order
2295 	 * could also have created a new "in place" conversion deadlock.
2296 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2297 	 * After recovery, there would be no granted locks, and possibly
2298 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2299 	 * recovery, grant conversions without considering order.
2300 	 */
2301 
2302 	if (conv && recover)
2303 		return 1;
2304 
2305 	/*
2306 	 * 6-5: But the default algorithm for deciding whether to grant or
2307 	 * queue conversion requests does not by itself guarantee that such
2308 	 * requests are serviced on a "first come first serve" basis.  This, in
2309 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2310 	 *
2311 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2312 	 * the system service employed to request a lock conversion.  This flag
2313 	 * forces certain conversion requests to be queued, even if they are
2314 	 * compatible with the granted modes of other locks on the same
2315 	 * resource.  Thus, the use of this flag results in conversion requests
2316 	 * being ordered on a "first come first servce" basis.
2317 	 *
2318 	 * DCT: This condition is all about new conversions being able to occur
2319 	 * "in place" while the lock remains on the granted queue (assuming
2320 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2321 	 * doesn't _have_ to go onto the convert queue where it's processed in
2322 	 * order.  The "now" variable is necessary to distinguish converts
2323 	 * being received and processed for the first time now, because once a
2324 	 * convert is moved to the conversion queue the condition below applies
2325 	 * requiring fifo granting.
2326 	 */
2327 
2328 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2329 		return 1;
2330 
2331 	/*
2332 	 * Even if the convert is compat with all granted locks,
2333 	 * QUECVT forces it behind other locks on the convert queue.
2334 	 */
2335 
2336 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2337 		if (list_empty(&r->res_convertqueue))
2338 			return 1;
2339 		else
2340 			return 0;
2341 	}
2342 
2343 	/*
2344 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2345 	 * order.
2346 	 */
2347 
2348 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2349 		return 1;
2350 
2351 	/*
2352 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2353 	 * granted until all other conversion requests ahead of it are granted
2354 	 * and/or canceled.
2355 	 */
2356 
2357 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2358 		return 1;
2359 
2360 	/*
2361 	 * 6-4: By default, a new request is immediately granted only if all
2362 	 * three of the following conditions are satisfied when the request is
2363 	 * issued:
2364 	 * - The queue of ungranted conversion requests for the resource is
2365 	 *   empty.
2366 	 * - The queue of ungranted new requests for the resource is empty.
2367 	 * - The mode of the new request is compatible with the most
2368 	 *   restrictive mode of all granted locks on the resource.
2369 	 */
2370 
2371 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2372 	    list_empty(&r->res_waitqueue))
2373 		return 1;
2374 
2375 	/*
2376 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2377 	 * it cannot be granted until the queue of ungranted conversion
2378 	 * requests is empty, all ungranted new requests ahead of it are
2379 	 * granted and/or canceled, and it is compatible with the granted mode
2380 	 * of the most restrictive lock granted on the resource.
2381 	 */
2382 
2383 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2384 	    first_in_list(lkb, &r->res_waitqueue))
2385 		return 1;
2386 
2387 	return 0;
2388 }
2389 
2390 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2391 			  int recover, int *err)
2392 {
2393 	int rv;
2394 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2395 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2396 
2397 	if (err)
2398 		*err = 0;
2399 
2400 	rv = _can_be_granted(r, lkb, now, recover);
2401 	if (rv)
2402 		goto out;
2403 
2404 	/*
2405 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2406 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2407 	 * cancels one of the locks.
2408 	 */
2409 
2410 	if (is_convert && can_be_queued(lkb) &&
2411 	    conversion_deadlock_detect(r, lkb)) {
2412 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2413 			lkb->lkb_grmode = DLM_LOCK_NL;
2414 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2415 		} else if (err) {
2416 			*err = -EDEADLK;
2417 		} else {
2418 			log_print("can_be_granted deadlock %x now %d",
2419 				  lkb->lkb_id, now);
2420 			dlm_dump_rsb(r);
2421 		}
2422 		goto out;
2423 	}
2424 
2425 	/*
2426 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2427 	 * to grant a request in a mode other than the normal rqmode.  It's a
2428 	 * simple way to provide a big optimization to applications that can
2429 	 * use them.
2430 	 */
2431 
2432 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2433 		alt = DLM_LOCK_PR;
2434 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2435 		alt = DLM_LOCK_CW;
2436 
2437 	if (alt) {
2438 		lkb->lkb_rqmode = alt;
2439 		rv = _can_be_granted(r, lkb, now, 0);
2440 		if (rv)
2441 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2442 		else
2443 			lkb->lkb_rqmode = rqmode;
2444 	}
2445  out:
2446 	return rv;
2447 }
2448 
2449 /* Returns the highest requested mode of all blocked conversions; sets
2450    cw if there's a blocked conversion to DLM_LOCK_CW. */
2451 
2452 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2453 				 unsigned int *count)
2454 {
2455 	struct dlm_lkb *lkb, *s;
2456 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2457 	int hi, demoted, quit, grant_restart, demote_restart;
2458 	int deadlk;
2459 
2460 	quit = 0;
2461  restart:
2462 	grant_restart = 0;
2463 	demote_restart = 0;
2464 	hi = DLM_LOCK_IV;
2465 
2466 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2467 		demoted = is_demoted(lkb);
2468 		deadlk = 0;
2469 
2470 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2471 			grant_lock_pending(r, lkb);
2472 			grant_restart = 1;
2473 			if (count)
2474 				(*count)++;
2475 			continue;
2476 		}
2477 
2478 		if (!demoted && is_demoted(lkb)) {
2479 			log_print("WARN: pending demoted %x node %d %s",
2480 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2481 			demote_restart = 1;
2482 			continue;
2483 		}
2484 
2485 		if (deadlk) {
2486 			/*
2487 			 * If DLM_LKB_NODLKWT flag is set and conversion
2488 			 * deadlock is detected, we request blocking AST and
2489 			 * down (or cancel) conversion.
2490 			 */
2491 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2492 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2493 					queue_bast(r, lkb, lkb->lkb_rqmode);
2494 					lkb->lkb_highbast = lkb->lkb_rqmode;
2495 				}
2496 			} else {
2497 				log_print("WARN: pending deadlock %x node %d %s",
2498 					  lkb->lkb_id, lkb->lkb_nodeid,
2499 					  r->res_name);
2500 				dlm_dump_rsb(r);
2501 			}
2502 			continue;
2503 		}
2504 
2505 		hi = max_t(int, lkb->lkb_rqmode, hi);
2506 
2507 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2508 			*cw = 1;
2509 	}
2510 
2511 	if (grant_restart)
2512 		goto restart;
2513 	if (demote_restart && !quit) {
2514 		quit = 1;
2515 		goto restart;
2516 	}
2517 
2518 	return max_t(int, high, hi);
2519 }
2520 
2521 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2522 			      unsigned int *count)
2523 {
2524 	struct dlm_lkb *lkb, *s;
2525 
2526 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2527 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2528 			grant_lock_pending(r, lkb);
2529 			if (count)
2530 				(*count)++;
2531 		} else {
2532 			high = max_t(int, lkb->lkb_rqmode, high);
2533 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2534 				*cw = 1;
2535 		}
2536 	}
2537 
2538 	return high;
2539 }
2540 
2541 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2542    on either the convert or waiting queue.
2543    high is the largest rqmode of all locks blocked on the convert or
2544    waiting queue. */
2545 
2546 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2547 {
2548 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2549 		if (gr->lkb_highbast < DLM_LOCK_EX)
2550 			return 1;
2551 		return 0;
2552 	}
2553 
2554 	if (gr->lkb_highbast < high &&
2555 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2556 		return 1;
2557 	return 0;
2558 }
2559 
2560 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2561 {
2562 	struct dlm_lkb *lkb, *s;
2563 	int high = DLM_LOCK_IV;
2564 	int cw = 0;
2565 
2566 	if (!is_master(r)) {
2567 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2568 		dlm_dump_rsb(r);
2569 		return;
2570 	}
2571 
2572 	high = grant_pending_convert(r, high, &cw, count);
2573 	high = grant_pending_wait(r, high, &cw, count);
2574 
2575 	if (high == DLM_LOCK_IV)
2576 		return;
2577 
2578 	/*
2579 	 * If there are locks left on the wait/convert queue then send blocking
2580 	 * ASTs to granted locks based on the largest requested mode (high)
2581 	 * found above.
2582 	 */
2583 
2584 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2585 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2586 			if (cw && high == DLM_LOCK_PR &&
2587 			    lkb->lkb_grmode == DLM_LOCK_PR)
2588 				queue_bast(r, lkb, DLM_LOCK_CW);
2589 			else
2590 				queue_bast(r, lkb, high);
2591 			lkb->lkb_highbast = high;
2592 		}
2593 	}
2594 }
2595 
2596 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2597 {
2598 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2599 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2600 		if (gr->lkb_highbast < DLM_LOCK_EX)
2601 			return 1;
2602 		return 0;
2603 	}
2604 
2605 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2606 		return 1;
2607 	return 0;
2608 }
2609 
2610 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2611 			    struct dlm_lkb *lkb)
2612 {
2613 	struct dlm_lkb *gr;
2614 
2615 	list_for_each_entry(gr, head, lkb_statequeue) {
2616 		/* skip self when sending basts to convertqueue */
2617 		if (gr == lkb)
2618 			continue;
2619 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2620 			queue_bast(r, gr, lkb->lkb_rqmode);
2621 			gr->lkb_highbast = lkb->lkb_rqmode;
2622 		}
2623 	}
2624 }
2625 
2626 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2627 {
2628 	send_bast_queue(r, &r->res_grantqueue, lkb);
2629 }
2630 
2631 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2632 {
2633 	send_bast_queue(r, &r->res_grantqueue, lkb);
2634 	send_bast_queue(r, &r->res_convertqueue, lkb);
2635 }
2636 
2637 /* set_master(r, lkb) -- set the master nodeid of a resource
2638 
2639    The purpose of this function is to set the nodeid field in the given
2640    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2641    known, it can just be copied to the lkb and the function will return
2642    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2643    before it can be copied to the lkb.
2644 
2645    When the rsb nodeid is being looked up remotely, the initial lkb
2646    causing the lookup is kept on the ls_waiters list waiting for the
2647    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2648    on the rsb's res_lookup list until the master is verified.
2649 
2650    Return values:
2651    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2652    1: the rsb master is not available and the lkb has been placed on
2653       a wait queue
2654 */
2655 
2656 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2657 {
2658 	int our_nodeid = dlm_our_nodeid();
2659 
2660 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2661 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2662 		r->res_first_lkid = lkb->lkb_id;
2663 		lkb->lkb_nodeid = r->res_nodeid;
2664 		return 0;
2665 	}
2666 
2667 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2668 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2669 		return 1;
2670 	}
2671 
2672 	if (r->res_master_nodeid == our_nodeid) {
2673 		lkb->lkb_nodeid = 0;
2674 		return 0;
2675 	}
2676 
2677 	if (r->res_master_nodeid) {
2678 		lkb->lkb_nodeid = r->res_master_nodeid;
2679 		return 0;
2680 	}
2681 
2682 	if (dlm_dir_nodeid(r) == our_nodeid) {
2683 		/* This is a somewhat unusual case; find_rsb will usually
2684 		   have set res_master_nodeid when dir nodeid is local, but
2685 		   there are cases where we become the dir node after we've
2686 		   past find_rsb and go through _request_lock again.
2687 		   confirm_master() or process_lookup_list() needs to be
2688 		   called after this. */
2689 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2690 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2691 			  r->res_name);
2692 		r->res_master_nodeid = our_nodeid;
2693 		r->res_nodeid = 0;
2694 		lkb->lkb_nodeid = 0;
2695 		return 0;
2696 	}
2697 
2698 	r->res_first_lkid = lkb->lkb_id;
2699 	send_lookup(r, lkb);
2700 	return 1;
2701 }
2702 
2703 static void process_lookup_list(struct dlm_rsb *r)
2704 {
2705 	struct dlm_lkb *lkb, *safe;
2706 
2707 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2708 		list_del_init(&lkb->lkb_rsb_lookup);
2709 		_request_lock(r, lkb);
2710 	}
2711 }
2712 
2713 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2714 
2715 static void confirm_master(struct dlm_rsb *r, int error)
2716 {
2717 	struct dlm_lkb *lkb;
2718 
2719 	if (!r->res_first_lkid)
2720 		return;
2721 
2722 	switch (error) {
2723 	case 0:
2724 	case -EINPROGRESS:
2725 		r->res_first_lkid = 0;
2726 		process_lookup_list(r);
2727 		break;
2728 
2729 	case -EAGAIN:
2730 	case -EBADR:
2731 	case -ENOTBLK:
2732 		/* the remote request failed and won't be retried (it was
2733 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2734 		   lkb the first_lkid */
2735 
2736 		r->res_first_lkid = 0;
2737 
2738 		if (!list_empty(&r->res_lookup)) {
2739 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2740 					 lkb_rsb_lookup);
2741 			list_del_init(&lkb->lkb_rsb_lookup);
2742 			r->res_first_lkid = lkb->lkb_id;
2743 			_request_lock(r, lkb);
2744 		}
2745 		break;
2746 
2747 	default:
2748 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2749 	}
2750 }
2751 
2752 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2753 			 int namelen, void (*ast)(void *astparam),
2754 			 void *astparam,
2755 			 void (*bast)(void *astparam, int mode),
2756 			 struct dlm_args *args)
2757 {
2758 	int rv = -EINVAL;
2759 
2760 	/* check for invalid arg usage */
2761 
2762 	if (mode < 0 || mode > DLM_LOCK_EX)
2763 		goto out;
2764 
2765 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2766 		goto out;
2767 
2768 	if (flags & DLM_LKF_CANCEL)
2769 		goto out;
2770 
2771 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2772 		goto out;
2773 
2774 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2775 		goto out;
2776 
2777 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2778 		goto out;
2779 
2780 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2781 		goto out;
2782 
2783 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2784 		goto out;
2785 
2786 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2787 		goto out;
2788 
2789 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2790 		goto out;
2791 
2792 	if (!ast || !lksb)
2793 		goto out;
2794 
2795 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2796 		goto out;
2797 
2798 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2799 		goto out;
2800 
2801 	/* these args will be copied to the lkb in validate_lock_args,
2802 	   it cannot be done now because when converting locks, fields in
2803 	   an active lkb cannot be modified before locking the rsb */
2804 
2805 	args->flags = flags;
2806 	args->astfn = ast;
2807 	args->astparam = astparam;
2808 	args->bastfn = bast;
2809 	args->mode = mode;
2810 	args->lksb = lksb;
2811 	rv = 0;
2812  out:
2813 	return rv;
2814 }
2815 
2816 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2817 {
2818 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2819  		      DLM_LKF_FORCEUNLOCK))
2820 		return -EINVAL;
2821 
2822 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2823 		return -EINVAL;
2824 
2825 	args->flags = flags;
2826 	args->astparam = astarg;
2827 	return 0;
2828 }
2829 
2830 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2831 			      struct dlm_args *args)
2832 {
2833 	int rv = -EBUSY;
2834 
2835 	if (args->flags & DLM_LKF_CONVERT) {
2836 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2837 			goto out;
2838 
2839 		/* lock not allowed if there's any op in progress */
2840 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2841 			goto out;
2842 
2843 		if (is_overlap(lkb))
2844 			goto out;
2845 
2846 		rv = -EINVAL;
2847 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2848 			goto out;
2849 
2850 		if (args->flags & DLM_LKF_QUECVT &&
2851 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2852 			goto out;
2853 	}
2854 
2855 	lkb->lkb_exflags = args->flags;
2856 	dlm_set_sbflags_val(lkb, 0);
2857 	lkb->lkb_astfn = args->astfn;
2858 	lkb->lkb_astparam = args->astparam;
2859 	lkb->lkb_bastfn = args->bastfn;
2860 	lkb->lkb_rqmode = args->mode;
2861 	lkb->lkb_lksb = args->lksb;
2862 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2863 	lkb->lkb_ownpid = (int) current->pid;
2864 	rv = 0;
2865  out:
2866 	switch (rv) {
2867 	case 0:
2868 		break;
2869 	case -EINVAL:
2870 		/* annoy the user because dlm usage is wrong */
2871 		WARN_ON(1);
2872 		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2873 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2874 			  lkb->lkb_status, lkb->lkb_wait_type);
2875 		break;
2876 	default:
2877 		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2878 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2879 			  lkb->lkb_status, lkb->lkb_wait_type);
2880 		break;
2881 	}
2882 
2883 	return rv;
2884 }
2885 
2886 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2887    for success */
2888 
2889 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2890    because there may be a lookup in progress and it's valid to do
2891    cancel/unlockf on it */
2892 
2893 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2894 {
2895 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2896 	int rv = -EBUSY;
2897 
2898 	/* normal unlock not allowed if there's any op in progress */
2899 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2900 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2901 		goto out;
2902 
2903 	/* an lkb may be waiting for an rsb lookup to complete where the
2904 	   lookup was initiated by another lock */
2905 
2906 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2907 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2908 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2909 			list_del_init(&lkb->lkb_rsb_lookup);
2910 			queue_cast(lkb->lkb_resource, lkb,
2911 				   args->flags & DLM_LKF_CANCEL ?
2912 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2913 			unhold_lkb(lkb); /* undoes create_lkb() */
2914 		}
2915 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2916 		goto out;
2917 	}
2918 
2919 	rv = -EINVAL;
2920 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2921 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2922 		dlm_print_lkb(lkb);
2923 		goto out;
2924 	}
2925 
2926 	/* an lkb may still exist even though the lock is EOL'ed due to a
2927 	 * cancel, unlock or failed noqueue request; an app can't use these
2928 	 * locks; return same error as if the lkid had not been found at all
2929 	 */
2930 
2931 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2932 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2933 		rv = -ENOENT;
2934 		goto out;
2935 	}
2936 
2937 	if (is_overlap_unlock(lkb))
2938 		goto out;
2939 
2940 	/* cancel not allowed with another cancel/unlock in progress */
2941 
2942 	if (args->flags & DLM_LKF_CANCEL) {
2943 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2944 			goto out;
2945 
2946 		if (is_overlap_cancel(lkb))
2947 			goto out;
2948 
2949 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2950 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2951 			rv = -EBUSY;
2952 			goto out;
2953 		}
2954 
2955 		/* there's nothing to cancel */
2956 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2957 		    !lkb->lkb_wait_type) {
2958 			rv = -EBUSY;
2959 			goto out;
2960 		}
2961 
2962 		switch (lkb->lkb_wait_type) {
2963 		case DLM_MSG_LOOKUP:
2964 		case DLM_MSG_REQUEST:
2965 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2966 			rv = -EBUSY;
2967 			goto out;
2968 		case DLM_MSG_UNLOCK:
2969 		case DLM_MSG_CANCEL:
2970 			goto out;
2971 		}
2972 		/* add_to_waiters() will set OVERLAP_CANCEL */
2973 		goto out_ok;
2974 	}
2975 
2976 	/* do we need to allow a force-unlock if there's a normal unlock
2977 	   already in progress?  in what conditions could the normal unlock
2978 	   fail such that we'd want to send a force-unlock to be sure? */
2979 
2980 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2981 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2982 			goto out;
2983 
2984 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2985 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2986 			rv = -EBUSY;
2987 			goto out;
2988 		}
2989 
2990 		switch (lkb->lkb_wait_type) {
2991 		case DLM_MSG_LOOKUP:
2992 		case DLM_MSG_REQUEST:
2993 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2994 			rv = -EBUSY;
2995 			goto out;
2996 		case DLM_MSG_UNLOCK:
2997 			goto out;
2998 		}
2999 		/* add_to_waiters() will set OVERLAP_UNLOCK */
3000 	}
3001 
3002  out_ok:
3003 	/* an overlapping op shouldn't blow away exflags from other op */
3004 	lkb->lkb_exflags |= args->flags;
3005 	dlm_set_sbflags_val(lkb, 0);
3006 	lkb->lkb_astparam = args->astparam;
3007 	rv = 0;
3008  out:
3009 	switch (rv) {
3010 	case 0:
3011 		break;
3012 	case -EINVAL:
3013 		/* annoy the user because dlm usage is wrong */
3014 		WARN_ON(1);
3015 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3016 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3017 			  args->flags, lkb->lkb_wait_type,
3018 			  lkb->lkb_resource->res_name);
3019 		break;
3020 	default:
3021 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3022 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3023 			  args->flags, lkb->lkb_wait_type,
3024 			  lkb->lkb_resource->res_name);
3025 		break;
3026 	}
3027 
3028 	return rv;
3029 }
3030 
3031 /*
3032  * Four stage 4 varieties:
3033  * do_request(), do_convert(), do_unlock(), do_cancel()
3034  * These are called on the master node for the given lock and
3035  * from the central locking logic.
3036  */
3037 
3038 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3039 {
3040 	int error = 0;
3041 
3042 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3043 		grant_lock(r, lkb);
3044 		queue_cast(r, lkb, 0);
3045 		goto out;
3046 	}
3047 
3048 	if (can_be_queued(lkb)) {
3049 		error = -EINPROGRESS;
3050 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3051 		goto out;
3052 	}
3053 
3054 	error = -EAGAIN;
3055 	queue_cast(r, lkb, -EAGAIN);
3056  out:
3057 	return error;
3058 }
3059 
3060 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3061 			       int error)
3062 {
3063 	switch (error) {
3064 	case -EAGAIN:
3065 		if (force_blocking_asts(lkb))
3066 			send_blocking_asts_all(r, lkb);
3067 		break;
3068 	case -EINPROGRESS:
3069 		send_blocking_asts(r, lkb);
3070 		break;
3071 	}
3072 }
3073 
3074 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3075 {
3076 	int error = 0;
3077 	int deadlk = 0;
3078 
3079 	/* changing an existing lock may allow others to be granted */
3080 
3081 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3082 		grant_lock(r, lkb);
3083 		queue_cast(r, lkb, 0);
3084 		goto out;
3085 	}
3086 
3087 	/* can_be_granted() detected that this lock would block in a conversion
3088 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3089 	   the ast for the convert. */
3090 
3091 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3092 		/* it's left on the granted queue */
3093 		revert_lock(r, lkb);
3094 		queue_cast(r, lkb, -EDEADLK);
3095 		error = -EDEADLK;
3096 		goto out;
3097 	}
3098 
3099 	/* is_demoted() means the can_be_granted() above set the grmode
3100 	   to NL, and left us on the granted queue.  This auto-demotion
3101 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3102 	   now grantable.  We have to try to grant other converting locks
3103 	   before we try again to grant this one. */
3104 
3105 	if (is_demoted(lkb)) {
3106 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3107 		if (_can_be_granted(r, lkb, 1, 0)) {
3108 			grant_lock(r, lkb);
3109 			queue_cast(r, lkb, 0);
3110 			goto out;
3111 		}
3112 		/* else fall through and move to convert queue */
3113 	}
3114 
3115 	if (can_be_queued(lkb)) {
3116 		error = -EINPROGRESS;
3117 		del_lkb(r, lkb);
3118 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3119 		goto out;
3120 	}
3121 
3122 	error = -EAGAIN;
3123 	queue_cast(r, lkb, -EAGAIN);
3124  out:
3125 	return error;
3126 }
3127 
3128 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3129 			       int error)
3130 {
3131 	switch (error) {
3132 	case 0:
3133 		grant_pending_locks(r, NULL);
3134 		/* grant_pending_locks also sends basts */
3135 		break;
3136 	case -EAGAIN:
3137 		if (force_blocking_asts(lkb))
3138 			send_blocking_asts_all(r, lkb);
3139 		break;
3140 	case -EINPROGRESS:
3141 		send_blocking_asts(r, lkb);
3142 		break;
3143 	}
3144 }
3145 
3146 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3147 {
3148 	remove_lock(r, lkb);
3149 	queue_cast(r, lkb, -DLM_EUNLOCK);
3150 	return -DLM_EUNLOCK;
3151 }
3152 
3153 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3154 			      int error)
3155 {
3156 	grant_pending_locks(r, NULL);
3157 }
3158 
3159 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3160 
3161 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3162 {
3163 	int error;
3164 
3165 	error = revert_lock(r, lkb);
3166 	if (error) {
3167 		queue_cast(r, lkb, -DLM_ECANCEL);
3168 		return -DLM_ECANCEL;
3169 	}
3170 	return 0;
3171 }
3172 
3173 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3174 			      int error)
3175 {
3176 	if (error)
3177 		grant_pending_locks(r, NULL);
3178 }
3179 
3180 /*
3181  * Four stage 3 varieties:
3182  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3183  */
3184 
3185 /* add a new lkb to a possibly new rsb, called by requesting process */
3186 
3187 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3188 {
3189 	int error;
3190 
3191 	/* set_master: sets lkb nodeid from r */
3192 
3193 	error = set_master(r, lkb);
3194 	if (error < 0)
3195 		goto out;
3196 	if (error) {
3197 		error = 0;
3198 		goto out;
3199 	}
3200 
3201 	if (is_remote(r)) {
3202 		/* receive_request() calls do_request() on remote node */
3203 		error = send_request(r, lkb);
3204 	} else {
3205 		error = do_request(r, lkb);
3206 		/* for remote locks the request_reply is sent
3207 		   between do_request and do_request_effects */
3208 		do_request_effects(r, lkb, error);
3209 	}
3210  out:
3211 	return error;
3212 }
3213 
3214 /* change some property of an existing lkb, e.g. mode */
3215 
3216 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3217 {
3218 	int error;
3219 
3220 	if (is_remote(r)) {
3221 		/* receive_convert() calls do_convert() on remote node */
3222 		error = send_convert(r, lkb);
3223 	} else {
3224 		error = do_convert(r, lkb);
3225 		/* for remote locks the convert_reply is sent
3226 		   between do_convert and do_convert_effects */
3227 		do_convert_effects(r, lkb, error);
3228 	}
3229 
3230 	return error;
3231 }
3232 
3233 /* remove an existing lkb from the granted queue */
3234 
3235 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3236 {
3237 	int error;
3238 
3239 	if (is_remote(r)) {
3240 		/* receive_unlock() calls do_unlock() on remote node */
3241 		error = send_unlock(r, lkb);
3242 	} else {
3243 		error = do_unlock(r, lkb);
3244 		/* for remote locks the unlock_reply is sent
3245 		   between do_unlock and do_unlock_effects */
3246 		do_unlock_effects(r, lkb, error);
3247 	}
3248 
3249 	return error;
3250 }
3251 
3252 /* remove an existing lkb from the convert or wait queue */
3253 
3254 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3255 {
3256 	int error;
3257 
3258 	if (is_remote(r)) {
3259 		/* receive_cancel() calls do_cancel() on remote node */
3260 		error = send_cancel(r, lkb);
3261 	} else {
3262 		error = do_cancel(r, lkb);
3263 		/* for remote locks the cancel_reply is sent
3264 		   between do_cancel and do_cancel_effects */
3265 		do_cancel_effects(r, lkb, error);
3266 	}
3267 
3268 	return error;
3269 }
3270 
3271 /*
3272  * Four stage 2 varieties:
3273  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3274  */
3275 
3276 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3277 			const void *name, int len,
3278 			struct dlm_args *args)
3279 {
3280 	struct dlm_rsb *r;
3281 	int error;
3282 
3283 	error = validate_lock_args(ls, lkb, args);
3284 	if (error)
3285 		return error;
3286 
3287 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3288 	if (error)
3289 		return error;
3290 
3291 	lock_rsb(r);
3292 
3293 	attach_lkb(r, lkb);
3294 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3295 
3296 	error = _request_lock(r, lkb);
3297 
3298 	unlock_rsb(r);
3299 	put_rsb(r);
3300 	return error;
3301 }
3302 
3303 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3304 			struct dlm_args *args)
3305 {
3306 	struct dlm_rsb *r;
3307 	int error;
3308 
3309 	r = lkb->lkb_resource;
3310 
3311 	hold_rsb(r);
3312 	lock_rsb(r);
3313 
3314 	error = validate_lock_args(ls, lkb, args);
3315 	if (error)
3316 		goto out;
3317 
3318 	error = _convert_lock(r, lkb);
3319  out:
3320 	unlock_rsb(r);
3321 	put_rsb(r);
3322 	return error;
3323 }
3324 
3325 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3326 		       struct dlm_args *args)
3327 {
3328 	struct dlm_rsb *r;
3329 	int error;
3330 
3331 	r = lkb->lkb_resource;
3332 
3333 	hold_rsb(r);
3334 	lock_rsb(r);
3335 
3336 	error = validate_unlock_args(lkb, args);
3337 	if (error)
3338 		goto out;
3339 
3340 	error = _unlock_lock(r, lkb);
3341  out:
3342 	unlock_rsb(r);
3343 	put_rsb(r);
3344 	return error;
3345 }
3346 
3347 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3348 		       struct dlm_args *args)
3349 {
3350 	struct dlm_rsb *r;
3351 	int error;
3352 
3353 	r = lkb->lkb_resource;
3354 
3355 	hold_rsb(r);
3356 	lock_rsb(r);
3357 
3358 	error = validate_unlock_args(lkb, args);
3359 	if (error)
3360 		goto out;
3361 
3362 	error = _cancel_lock(r, lkb);
3363  out:
3364 	unlock_rsb(r);
3365 	put_rsb(r);
3366 	return error;
3367 }
3368 
3369 /*
3370  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3371  */
3372 
3373 int dlm_lock(dlm_lockspace_t *lockspace,
3374 	     int mode,
3375 	     struct dlm_lksb *lksb,
3376 	     uint32_t flags,
3377 	     const void *name,
3378 	     unsigned int namelen,
3379 	     uint32_t parent_lkid,
3380 	     void (*ast) (void *astarg),
3381 	     void *astarg,
3382 	     void (*bast) (void *astarg, int mode))
3383 {
3384 	struct dlm_ls *ls;
3385 	struct dlm_lkb *lkb;
3386 	struct dlm_args args;
3387 	int error, convert = flags & DLM_LKF_CONVERT;
3388 
3389 	ls = dlm_find_lockspace_local(lockspace);
3390 	if (!ls)
3391 		return -EINVAL;
3392 
3393 	dlm_lock_recovery(ls);
3394 
3395 	if (convert)
3396 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3397 	else
3398 		error = create_lkb(ls, &lkb);
3399 
3400 	if (error)
3401 		goto out;
3402 
3403 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3404 
3405 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3406 			      &args);
3407 	if (error)
3408 		goto out_put;
3409 
3410 	if (convert)
3411 		error = convert_lock(ls, lkb, &args);
3412 	else
3413 		error = request_lock(ls, lkb, name, namelen, &args);
3414 
3415 	if (error == -EINPROGRESS)
3416 		error = 0;
3417  out_put:
3418 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3419 
3420 	if (convert || error)
3421 		__put_lkb(ls, lkb);
3422 	if (error == -EAGAIN || error == -EDEADLK)
3423 		error = 0;
3424  out:
3425 	dlm_unlock_recovery(ls);
3426 	dlm_put_lockspace(ls);
3427 	return error;
3428 }
3429 
3430 int dlm_unlock(dlm_lockspace_t *lockspace,
3431 	       uint32_t lkid,
3432 	       uint32_t flags,
3433 	       struct dlm_lksb *lksb,
3434 	       void *astarg)
3435 {
3436 	struct dlm_ls *ls;
3437 	struct dlm_lkb *lkb;
3438 	struct dlm_args args;
3439 	int error;
3440 
3441 	ls = dlm_find_lockspace_local(lockspace);
3442 	if (!ls)
3443 		return -EINVAL;
3444 
3445 	dlm_lock_recovery(ls);
3446 
3447 	error = find_lkb(ls, lkid, &lkb);
3448 	if (error)
3449 		goto out;
3450 
3451 	trace_dlm_unlock_start(ls, lkb, flags);
3452 
3453 	error = set_unlock_args(flags, astarg, &args);
3454 	if (error)
3455 		goto out_put;
3456 
3457 	if (flags & DLM_LKF_CANCEL)
3458 		error = cancel_lock(ls, lkb, &args);
3459 	else
3460 		error = unlock_lock(ls, lkb, &args);
3461 
3462 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3463 		error = 0;
3464 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3465 		error = 0;
3466  out_put:
3467 	trace_dlm_unlock_end(ls, lkb, flags, error);
3468 
3469 	dlm_put_lkb(lkb);
3470  out:
3471 	dlm_unlock_recovery(ls);
3472 	dlm_put_lockspace(ls);
3473 	return error;
3474 }
3475 
3476 /*
3477  * send/receive routines for remote operations and replies
3478  *
3479  * send_args
3480  * send_common
3481  * send_request			receive_request
3482  * send_convert			receive_convert
3483  * send_unlock			receive_unlock
3484  * send_cancel			receive_cancel
3485  * send_grant			receive_grant
3486  * send_bast			receive_bast
3487  * send_lookup			receive_lookup
3488  * send_remove			receive_remove
3489  *
3490  * 				send_common_reply
3491  * receive_request_reply	send_request_reply
3492  * receive_convert_reply	send_convert_reply
3493  * receive_unlock_reply		send_unlock_reply
3494  * receive_cancel_reply		send_cancel_reply
3495  * receive_lookup_reply		send_lookup_reply
3496  */
3497 
3498 static int _create_message(struct dlm_ls *ls, int mb_len,
3499 			   int to_nodeid, int mstype,
3500 			   struct dlm_message **ms_ret,
3501 			   struct dlm_mhandle **mh_ret)
3502 {
3503 	struct dlm_message *ms;
3504 	struct dlm_mhandle *mh;
3505 	char *mb;
3506 
3507 	/* get_buffer gives us a message handle (mh) that we need to
3508 	   pass into midcomms_commit and a message buffer (mb) that we
3509 	   write our data into */
3510 
3511 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3512 	if (!mh)
3513 		return -ENOBUFS;
3514 
3515 	ms = (struct dlm_message *) mb;
3516 
3517 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3518 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3519 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3520 	ms->m_header.h_length = cpu_to_le16(mb_len);
3521 	ms->m_header.h_cmd = DLM_MSG;
3522 
3523 	ms->m_type = cpu_to_le32(mstype);
3524 
3525 	*mh_ret = mh;
3526 	*ms_ret = ms;
3527 	return 0;
3528 }
3529 
3530 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3531 			  int to_nodeid, int mstype,
3532 			  struct dlm_message **ms_ret,
3533 			  struct dlm_mhandle **mh_ret)
3534 {
3535 	int mb_len = sizeof(struct dlm_message);
3536 
3537 	switch (mstype) {
3538 	case DLM_MSG_REQUEST:
3539 	case DLM_MSG_LOOKUP:
3540 	case DLM_MSG_REMOVE:
3541 		mb_len += r->res_length;
3542 		break;
3543 	case DLM_MSG_CONVERT:
3544 	case DLM_MSG_UNLOCK:
3545 	case DLM_MSG_REQUEST_REPLY:
3546 	case DLM_MSG_CONVERT_REPLY:
3547 	case DLM_MSG_GRANT:
3548 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3549 			mb_len += r->res_ls->ls_lvblen;
3550 		break;
3551 	}
3552 
3553 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3554 			       ms_ret, mh_ret);
3555 }
3556 
3557 /* further lowcomms enhancements or alternate implementations may make
3558    the return value from this function useful at some point */
3559 
3560 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3561 			const void *name, int namelen)
3562 {
3563 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3564 	return 0;
3565 }
3566 
3567 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3568 		      struct dlm_message *ms)
3569 {
3570 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3571 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3572 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3573 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3574 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3575 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3576 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3577 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3578 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3579 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3580 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3581 	ms->m_hash     = cpu_to_le32(r->res_hash);
3582 
3583 	/* m_result and m_bastmode are set from function args,
3584 	   not from lkb fields */
3585 
3586 	if (lkb->lkb_bastfn)
3587 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3588 	if (lkb->lkb_astfn)
3589 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3590 
3591 	/* compare with switch in create_message; send_remove() doesn't
3592 	   use send_args() */
3593 
3594 	switch (ms->m_type) {
3595 	case cpu_to_le32(DLM_MSG_REQUEST):
3596 	case cpu_to_le32(DLM_MSG_LOOKUP):
3597 		memcpy(ms->m_extra, r->res_name, r->res_length);
3598 		break;
3599 	case cpu_to_le32(DLM_MSG_CONVERT):
3600 	case cpu_to_le32(DLM_MSG_UNLOCK):
3601 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3602 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3603 	case cpu_to_le32(DLM_MSG_GRANT):
3604 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3605 			break;
3606 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3607 		break;
3608 	}
3609 }
3610 
3611 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3612 {
3613 	struct dlm_message *ms;
3614 	struct dlm_mhandle *mh;
3615 	int to_nodeid, error;
3616 
3617 	to_nodeid = r->res_nodeid;
3618 
3619 	add_to_waiters(lkb, mstype, to_nodeid);
3620 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3621 	if (error)
3622 		goto fail;
3623 
3624 	send_args(r, lkb, ms);
3625 
3626 	error = send_message(mh, ms, r->res_name, r->res_length);
3627 	if (error)
3628 		goto fail;
3629 	return 0;
3630 
3631  fail:
3632 	remove_from_waiters(lkb, msg_reply_type(mstype));
3633 	return error;
3634 }
3635 
3636 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3637 {
3638 	return send_common(r, lkb, DLM_MSG_REQUEST);
3639 }
3640 
3641 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3642 {
3643 	int error;
3644 
3645 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3646 
3647 	/* down conversions go without a reply from the master */
3648 	if (!error && down_conversion(lkb)) {
3649 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3650 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3651 		r->res_ls->ls_local_ms.m_result = 0;
3652 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3653 	}
3654 
3655 	return error;
3656 }
3657 
3658 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3659    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3660    that the master is still correct. */
3661 
3662 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3663 {
3664 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3665 }
3666 
3667 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3668 {
3669 	return send_common(r, lkb, DLM_MSG_CANCEL);
3670 }
3671 
3672 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3673 {
3674 	struct dlm_message *ms;
3675 	struct dlm_mhandle *mh;
3676 	int to_nodeid, error;
3677 
3678 	to_nodeid = lkb->lkb_nodeid;
3679 
3680 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3681 	if (error)
3682 		goto out;
3683 
3684 	send_args(r, lkb, ms);
3685 
3686 	ms->m_result = 0;
3687 
3688 	error = send_message(mh, ms, r->res_name, r->res_length);
3689  out:
3690 	return error;
3691 }
3692 
3693 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3694 {
3695 	struct dlm_message *ms;
3696 	struct dlm_mhandle *mh;
3697 	int to_nodeid, error;
3698 
3699 	to_nodeid = lkb->lkb_nodeid;
3700 
3701 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3702 	if (error)
3703 		goto out;
3704 
3705 	send_args(r, lkb, ms);
3706 
3707 	ms->m_bastmode = cpu_to_le32(mode);
3708 
3709 	error = send_message(mh, ms, r->res_name, r->res_length);
3710  out:
3711 	return error;
3712 }
3713 
3714 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3715 {
3716 	struct dlm_message *ms;
3717 	struct dlm_mhandle *mh;
3718 	int to_nodeid, error;
3719 
3720 	to_nodeid = dlm_dir_nodeid(r);
3721 
3722 	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3723 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3724 	if (error)
3725 		goto fail;
3726 
3727 	send_args(r, lkb, ms);
3728 
3729 	error = send_message(mh, ms, r->res_name, r->res_length);
3730 	if (error)
3731 		goto fail;
3732 	return 0;
3733 
3734  fail:
3735 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3736 	return error;
3737 }
3738 
3739 static int send_remove(struct dlm_rsb *r)
3740 {
3741 	struct dlm_message *ms;
3742 	struct dlm_mhandle *mh;
3743 	int to_nodeid, error;
3744 
3745 	to_nodeid = dlm_dir_nodeid(r);
3746 
3747 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3748 	if (error)
3749 		goto out;
3750 
3751 	memcpy(ms->m_extra, r->res_name, r->res_length);
3752 	ms->m_hash = cpu_to_le32(r->res_hash);
3753 
3754 	error = send_message(mh, ms, r->res_name, r->res_length);
3755  out:
3756 	return error;
3757 }
3758 
3759 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3760 			     int mstype, int rv)
3761 {
3762 	struct dlm_message *ms;
3763 	struct dlm_mhandle *mh;
3764 	int to_nodeid, error;
3765 
3766 	to_nodeid = lkb->lkb_nodeid;
3767 
3768 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3769 	if (error)
3770 		goto out;
3771 
3772 	send_args(r, lkb, ms);
3773 
3774 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3775 
3776 	error = send_message(mh, ms, r->res_name, r->res_length);
3777  out:
3778 	return error;
3779 }
3780 
3781 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3782 {
3783 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3784 }
3785 
3786 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3787 {
3788 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3789 }
3790 
3791 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3792 {
3793 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3794 }
3795 
3796 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3797 {
3798 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3799 }
3800 
3801 static int send_lookup_reply(struct dlm_ls *ls,
3802 			     const struct dlm_message *ms_in, int ret_nodeid,
3803 			     int rv)
3804 {
3805 	struct dlm_rsb *r = &ls->ls_local_rsb;
3806 	struct dlm_message *ms;
3807 	struct dlm_mhandle *mh;
3808 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3809 
3810 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3811 	if (error)
3812 		goto out;
3813 
3814 	ms->m_lkid = ms_in->m_lkid;
3815 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3816 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3817 
3818 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3819  out:
3820 	return error;
3821 }
3822 
3823 /* which args we save from a received message depends heavily on the type
3824    of message, unlike the send side where we can safely send everything about
3825    the lkb for any type of message */
3826 
3827 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3828 {
3829 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3830 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3831 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3832 }
3833 
3834 static void receive_flags_reply(struct dlm_lkb *lkb,
3835 				const struct dlm_message *ms,
3836 				bool local)
3837 {
3838 	if (local)
3839 		return;
3840 
3841 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3842 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3843 }
3844 
3845 static int receive_extralen(const struct dlm_message *ms)
3846 {
3847 	return (le16_to_cpu(ms->m_header.h_length) -
3848 		sizeof(struct dlm_message));
3849 }
3850 
3851 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3852 		       const struct dlm_message *ms)
3853 {
3854 	int len;
3855 
3856 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3857 		if (!lkb->lkb_lvbptr)
3858 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3859 		if (!lkb->lkb_lvbptr)
3860 			return -ENOMEM;
3861 		len = receive_extralen(ms);
3862 		if (len > ls->ls_lvblen)
3863 			len = ls->ls_lvblen;
3864 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3865 	}
3866 	return 0;
3867 }
3868 
3869 static void fake_bastfn(void *astparam, int mode)
3870 {
3871 	log_print("fake_bastfn should not be called");
3872 }
3873 
3874 static void fake_astfn(void *astparam)
3875 {
3876 	log_print("fake_astfn should not be called");
3877 }
3878 
3879 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3880 				const struct dlm_message *ms)
3881 {
3882 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3883 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3884 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3885 	lkb->lkb_grmode = DLM_LOCK_IV;
3886 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3887 
3888 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3889 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3890 
3891 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3892 		/* lkb was just created so there won't be an lvb yet */
3893 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3894 		if (!lkb->lkb_lvbptr)
3895 			return -ENOMEM;
3896 	}
3897 
3898 	return 0;
3899 }
3900 
3901 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3902 				const struct dlm_message *ms)
3903 {
3904 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3905 		return -EBUSY;
3906 
3907 	if (receive_lvb(ls, lkb, ms))
3908 		return -ENOMEM;
3909 
3910 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3911 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3912 
3913 	return 0;
3914 }
3915 
3916 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3917 			       const struct dlm_message *ms)
3918 {
3919 	if (receive_lvb(ls, lkb, ms))
3920 		return -ENOMEM;
3921 	return 0;
3922 }
3923 
3924 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3925    uses to send a reply and that the remote end uses to process the reply. */
3926 
3927 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3928 {
3929 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3930 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3931 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3932 }
3933 
3934 /* This is called after the rsb is locked so that we can safely inspect
3935    fields in the lkb. */
3936 
3937 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3938 {
3939 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3940 	int error = 0;
3941 
3942 	/* currently mixing of user/kernel locks are not supported */
3943 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3944 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3945 		log_error(lkb->lkb_resource->res_ls,
3946 			  "got user dlm message for a kernel lock");
3947 		error = -EINVAL;
3948 		goto out;
3949 	}
3950 
3951 	switch (ms->m_type) {
3952 	case cpu_to_le32(DLM_MSG_CONVERT):
3953 	case cpu_to_le32(DLM_MSG_UNLOCK):
3954 	case cpu_to_le32(DLM_MSG_CANCEL):
3955 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3956 			error = -EINVAL;
3957 		break;
3958 
3959 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3960 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3961 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3962 	case cpu_to_le32(DLM_MSG_GRANT):
3963 	case cpu_to_le32(DLM_MSG_BAST):
3964 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3965 			error = -EINVAL;
3966 		break;
3967 
3968 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3969 		if (!is_process_copy(lkb))
3970 			error = -EINVAL;
3971 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3972 			error = -EINVAL;
3973 		break;
3974 
3975 	default:
3976 		error = -EINVAL;
3977 	}
3978 
3979 out:
3980 	if (error)
3981 		log_error(lkb->lkb_resource->res_ls,
3982 			  "ignore invalid message %d from %d %x %x %x %d",
3983 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3984 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3985 			  lkb->lkb_nodeid);
3986 	return error;
3987 }
3988 
3989 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3990 {
3991 	struct dlm_lkb *lkb;
3992 	struct dlm_rsb *r;
3993 	int from_nodeid;
3994 	int error, namelen = 0;
3995 
3996 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3997 
3998 	error = create_lkb(ls, &lkb);
3999 	if (error)
4000 		goto fail;
4001 
4002 	receive_flags(lkb, ms);
4003 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4004 	error = receive_request_args(ls, lkb, ms);
4005 	if (error) {
4006 		__put_lkb(ls, lkb);
4007 		goto fail;
4008 	}
4009 
4010 	/* The dir node is the authority on whether we are the master
4011 	   for this rsb or not, so if the master sends us a request, we should
4012 	   recreate the rsb if we've destroyed it.   This race happens when we
4013 	   send a remove message to the dir node at the same time that the dir
4014 	   node sends us a request for the rsb. */
4015 
4016 	namelen = receive_extralen(ms);
4017 
4018 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4019 			 R_RECEIVE_REQUEST, &r);
4020 	if (error) {
4021 		__put_lkb(ls, lkb);
4022 		goto fail;
4023 	}
4024 
4025 	lock_rsb(r);
4026 
4027 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4028 		error = validate_master_nodeid(ls, r, from_nodeid);
4029 		if (error) {
4030 			unlock_rsb(r);
4031 			put_rsb(r);
4032 			__put_lkb(ls, lkb);
4033 			goto fail;
4034 		}
4035 	}
4036 
4037 	attach_lkb(r, lkb);
4038 	error = do_request(r, lkb);
4039 	send_request_reply(r, lkb, error);
4040 	do_request_effects(r, lkb, error);
4041 
4042 	unlock_rsb(r);
4043 	put_rsb(r);
4044 
4045 	if (error == -EINPROGRESS)
4046 		error = 0;
4047 	if (error)
4048 		dlm_put_lkb(lkb);
4049 	return 0;
4050 
4051  fail:
4052 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4053 	   and do this receive_request again from process_lookup_list once
4054 	   we get the lookup reply.  This would avoid a many repeated
4055 	   ENOTBLK request failures when the lookup reply designating us
4056 	   as master is delayed. */
4057 
4058 	if (error != -ENOTBLK) {
4059 		log_limit(ls, "receive_request %x from %d %d",
4060 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4061 	}
4062 
4063 	setup_local_lkb(ls, ms);
4064 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4065 	return error;
4066 }
4067 
4068 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4069 {
4070 	struct dlm_lkb *lkb;
4071 	struct dlm_rsb *r;
4072 	int error, reply = 1;
4073 
4074 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4075 	if (error)
4076 		goto fail;
4077 
4078 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4079 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4080 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4081 			  (unsigned long long)lkb->lkb_recover_seq,
4082 			  le32_to_cpu(ms->m_header.h_nodeid),
4083 			  le32_to_cpu(ms->m_lkid));
4084 		error = -ENOENT;
4085 		dlm_put_lkb(lkb);
4086 		goto fail;
4087 	}
4088 
4089 	r = lkb->lkb_resource;
4090 
4091 	hold_rsb(r);
4092 	lock_rsb(r);
4093 
4094 	error = validate_message(lkb, ms);
4095 	if (error)
4096 		goto out;
4097 
4098 	receive_flags(lkb, ms);
4099 
4100 	error = receive_convert_args(ls, lkb, ms);
4101 	if (error) {
4102 		send_convert_reply(r, lkb, error);
4103 		goto out;
4104 	}
4105 
4106 	reply = !down_conversion(lkb);
4107 
4108 	error = do_convert(r, lkb);
4109 	if (reply)
4110 		send_convert_reply(r, lkb, error);
4111 	do_convert_effects(r, lkb, error);
4112  out:
4113 	unlock_rsb(r);
4114 	put_rsb(r);
4115 	dlm_put_lkb(lkb);
4116 	return 0;
4117 
4118  fail:
4119 	setup_local_lkb(ls, ms);
4120 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4121 	return error;
4122 }
4123 
4124 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4125 {
4126 	struct dlm_lkb *lkb;
4127 	struct dlm_rsb *r;
4128 	int error;
4129 
4130 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4131 	if (error)
4132 		goto fail;
4133 
4134 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4135 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4136 			  lkb->lkb_id, lkb->lkb_remid,
4137 			  le32_to_cpu(ms->m_header.h_nodeid),
4138 			  le32_to_cpu(ms->m_lkid));
4139 		error = -ENOENT;
4140 		dlm_put_lkb(lkb);
4141 		goto fail;
4142 	}
4143 
4144 	r = lkb->lkb_resource;
4145 
4146 	hold_rsb(r);
4147 	lock_rsb(r);
4148 
4149 	error = validate_message(lkb, ms);
4150 	if (error)
4151 		goto out;
4152 
4153 	receive_flags(lkb, ms);
4154 
4155 	error = receive_unlock_args(ls, lkb, ms);
4156 	if (error) {
4157 		send_unlock_reply(r, lkb, error);
4158 		goto out;
4159 	}
4160 
4161 	error = do_unlock(r, lkb);
4162 	send_unlock_reply(r, lkb, error);
4163 	do_unlock_effects(r, lkb, error);
4164  out:
4165 	unlock_rsb(r);
4166 	put_rsb(r);
4167 	dlm_put_lkb(lkb);
4168 	return 0;
4169 
4170  fail:
4171 	setup_local_lkb(ls, ms);
4172 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4173 	return error;
4174 }
4175 
4176 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4177 {
4178 	struct dlm_lkb *lkb;
4179 	struct dlm_rsb *r;
4180 	int error;
4181 
4182 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4183 	if (error)
4184 		goto fail;
4185 
4186 	receive_flags(lkb, ms);
4187 
4188 	r = lkb->lkb_resource;
4189 
4190 	hold_rsb(r);
4191 	lock_rsb(r);
4192 
4193 	error = validate_message(lkb, ms);
4194 	if (error)
4195 		goto out;
4196 
4197 	error = do_cancel(r, lkb);
4198 	send_cancel_reply(r, lkb, error);
4199 	do_cancel_effects(r, lkb, error);
4200  out:
4201 	unlock_rsb(r);
4202 	put_rsb(r);
4203 	dlm_put_lkb(lkb);
4204 	return 0;
4205 
4206  fail:
4207 	setup_local_lkb(ls, ms);
4208 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4209 	return error;
4210 }
4211 
4212 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4213 {
4214 	struct dlm_lkb *lkb;
4215 	struct dlm_rsb *r;
4216 	int error;
4217 
4218 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4219 	if (error)
4220 		return error;
4221 
4222 	r = lkb->lkb_resource;
4223 
4224 	hold_rsb(r);
4225 	lock_rsb(r);
4226 
4227 	error = validate_message(lkb, ms);
4228 	if (error)
4229 		goto out;
4230 
4231 	receive_flags_reply(lkb, ms, false);
4232 	if (is_altmode(lkb))
4233 		munge_altmode(lkb, ms);
4234 	grant_lock_pc(r, lkb, ms);
4235 	queue_cast(r, lkb, 0);
4236  out:
4237 	unlock_rsb(r);
4238 	put_rsb(r);
4239 	dlm_put_lkb(lkb);
4240 	return 0;
4241 }
4242 
4243 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4244 {
4245 	struct dlm_lkb *lkb;
4246 	struct dlm_rsb *r;
4247 	int error;
4248 
4249 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4250 	if (error)
4251 		return error;
4252 
4253 	r = lkb->lkb_resource;
4254 
4255 	hold_rsb(r);
4256 	lock_rsb(r);
4257 
4258 	error = validate_message(lkb, ms);
4259 	if (error)
4260 		goto out;
4261 
4262 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4263 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4264  out:
4265 	unlock_rsb(r);
4266 	put_rsb(r);
4267 	dlm_put_lkb(lkb);
4268 	return 0;
4269 }
4270 
4271 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4272 {
4273 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4274 
4275 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4276 	our_nodeid = dlm_our_nodeid();
4277 
4278 	len = receive_extralen(ms);
4279 
4280 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4281 				  &ret_nodeid, NULL);
4282 
4283 	/* Optimization: we're master so treat lookup as a request */
4284 	if (!error && ret_nodeid == our_nodeid) {
4285 		receive_request(ls, ms);
4286 		return;
4287 	}
4288 	send_lookup_reply(ls, ms, ret_nodeid, error);
4289 }
4290 
4291 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4292 {
4293 	char name[DLM_RESNAME_MAXLEN+1];
4294 	struct dlm_rsb *r;
4295 	int rv, len, dir_nodeid, from_nodeid;
4296 
4297 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4298 
4299 	len = receive_extralen(ms);
4300 
4301 	if (len > DLM_RESNAME_MAXLEN) {
4302 		log_error(ls, "receive_remove from %d bad len %d",
4303 			  from_nodeid, len);
4304 		return;
4305 	}
4306 
4307 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4308 	if (dir_nodeid != dlm_our_nodeid()) {
4309 		log_error(ls, "receive_remove from %d bad nodeid %d",
4310 			  from_nodeid, dir_nodeid);
4311 		return;
4312 	}
4313 
4314 	/*
4315 	 * Look for inactive rsb, if it's there, free it.
4316 	 * If the rsb is active, it's being used, and we should ignore this
4317 	 * message.  This is an expected race between the dir node sending a
4318 	 * request to the master node at the same time as the master node sends
4319 	 * a remove to the dir node.  The resolution to that race is for the
4320 	 * dir node to ignore the remove message, and the master node to
4321 	 * recreate the master rsb when it gets a request from the dir node for
4322 	 * an rsb it doesn't have.
4323 	 */
4324 
4325 	memset(name, 0, sizeof(name));
4326 	memcpy(name, ms->m_extra, len);
4327 
4328 	rcu_read_lock();
4329 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4330 	if (rv) {
4331 		rcu_read_unlock();
4332 		/* should not happen */
4333 		log_error(ls, "%s from %d not found %s", __func__,
4334 			  from_nodeid, name);
4335 		return;
4336 	}
4337 
4338 	write_lock_bh(&ls->ls_rsbtbl_lock);
4339 	if (!rsb_flag(r, RSB_HASHED)) {
4340 		rcu_read_unlock();
4341 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4342 		/* should not happen */
4343 		log_error(ls, "%s from %d got removed during removal %s",
4344 			  __func__, from_nodeid, name);
4345 		return;
4346 	}
4347 	/* at this stage the rsb can only being freed here */
4348 	rcu_read_unlock();
4349 
4350 	if (!rsb_flag(r, RSB_INACTIVE)) {
4351 		if (r->res_master_nodeid != from_nodeid) {
4352 			/* should not happen */
4353 			log_error(ls, "receive_remove on active rsb from %d master %d",
4354 				  from_nodeid, r->res_master_nodeid);
4355 			dlm_print_rsb(r);
4356 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 			return;
4358 		}
4359 
4360 		/* Ignore the remove message, see race comment above. */
4361 
4362 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4363 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4364 			  name);
4365 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4366 		return;
4367 	}
4368 
4369 	if (r->res_master_nodeid != from_nodeid) {
4370 		log_error(ls, "receive_remove inactive from %d master %d",
4371 			  from_nodeid, r->res_master_nodeid);
4372 		dlm_print_rsb(r);
4373 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4374 		return;
4375 	}
4376 
4377 	list_del(&r->res_slow_list);
4378 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4379 			       dlm_rhash_rsb_params);
4380 	rsb_clear_flag(r, RSB_HASHED);
4381 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4382 
4383 	free_inactive_rsb(r);
4384 }
4385 
4386 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4387 {
4388 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4389 }
4390 
4391 static int receive_request_reply(struct dlm_ls *ls,
4392 				 const struct dlm_message *ms)
4393 {
4394 	struct dlm_lkb *lkb;
4395 	struct dlm_rsb *r;
4396 	int error, mstype, result;
4397 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4398 
4399 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 	if (error)
4401 		return error;
4402 
4403 	r = lkb->lkb_resource;
4404 	hold_rsb(r);
4405 	lock_rsb(r);
4406 
4407 	error = validate_message(lkb, ms);
4408 	if (error)
4409 		goto out;
4410 
4411 	mstype = lkb->lkb_wait_type;
4412 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4413 	if (error) {
4414 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4415 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4416 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4417 		dlm_dump_rsb(r);
4418 		goto out;
4419 	}
4420 
4421 	/* Optimization: the dir node was also the master, so it took our
4422 	   lookup as a request and sent request reply instead of lookup reply */
4423 	if (mstype == DLM_MSG_LOOKUP) {
4424 		r->res_master_nodeid = from_nodeid;
4425 		r->res_nodeid = from_nodeid;
4426 		lkb->lkb_nodeid = from_nodeid;
4427 	}
4428 
4429 	/* this is the value returned from do_request() on the master */
4430 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4431 
4432 	switch (result) {
4433 	case -EAGAIN:
4434 		/* request would block (be queued) on remote master */
4435 		queue_cast(r, lkb, -EAGAIN);
4436 		confirm_master(r, -EAGAIN);
4437 		unhold_lkb(lkb); /* undoes create_lkb() */
4438 		break;
4439 
4440 	case -EINPROGRESS:
4441 	case 0:
4442 		/* request was queued or granted on remote master */
4443 		receive_flags_reply(lkb, ms, false);
4444 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4445 		if (is_altmode(lkb))
4446 			munge_altmode(lkb, ms);
4447 		if (result) {
4448 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4449 		} else {
4450 			grant_lock_pc(r, lkb, ms);
4451 			queue_cast(r, lkb, 0);
4452 		}
4453 		confirm_master(r, result);
4454 		break;
4455 
4456 	case -EBADR:
4457 	case -ENOTBLK:
4458 		/* find_rsb failed to find rsb or rsb wasn't master */
4459 		log_limit(ls, "receive_request_reply %x from %d %d "
4460 			  "master %d dir %d first %x %s", lkb->lkb_id,
4461 			  from_nodeid, result, r->res_master_nodeid,
4462 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4463 
4464 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4465 		    r->res_master_nodeid != dlm_our_nodeid()) {
4466 			/* cause _request_lock->set_master->send_lookup */
4467 			r->res_master_nodeid = 0;
4468 			r->res_nodeid = -1;
4469 			lkb->lkb_nodeid = -1;
4470 		}
4471 
4472 		if (is_overlap(lkb)) {
4473 			/* we'll ignore error in cancel/unlock reply */
4474 			queue_cast_overlap(r, lkb);
4475 			confirm_master(r, result);
4476 			unhold_lkb(lkb); /* undoes create_lkb() */
4477 		} else {
4478 			_request_lock(r, lkb);
4479 
4480 			if (r->res_master_nodeid == dlm_our_nodeid())
4481 				confirm_master(r, 0);
4482 		}
4483 		break;
4484 
4485 	default:
4486 		log_error(ls, "receive_request_reply %x error %d",
4487 			  lkb->lkb_id, result);
4488 	}
4489 
4490 	if ((result == 0 || result == -EINPROGRESS) &&
4491 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4492 		log_debug(ls, "receive_request_reply %x result %d unlock",
4493 			  lkb->lkb_id, result);
4494 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4495 		send_unlock(r, lkb);
4496 	} else if ((result == -EINPROGRESS) &&
4497 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4498 				      &lkb->lkb_iflags)) {
4499 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4500 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4501 		send_cancel(r, lkb);
4502 	} else {
4503 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4504 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4505 	}
4506  out:
4507 	unlock_rsb(r);
4508 	put_rsb(r);
4509 	dlm_put_lkb(lkb);
4510 	return 0;
4511 }
4512 
4513 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4514 				    const struct dlm_message *ms, bool local)
4515 {
4516 	/* this is the value returned from do_convert() on the master */
4517 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4518 	case -EAGAIN:
4519 		/* convert would block (be queued) on remote master */
4520 		queue_cast(r, lkb, -EAGAIN);
4521 		break;
4522 
4523 	case -EDEADLK:
4524 		receive_flags_reply(lkb, ms, local);
4525 		revert_lock_pc(r, lkb);
4526 		queue_cast(r, lkb, -EDEADLK);
4527 		break;
4528 
4529 	case -EINPROGRESS:
4530 		/* convert was queued on remote master */
4531 		receive_flags_reply(lkb, ms, local);
4532 		if (is_demoted(lkb))
4533 			munge_demoted(lkb);
4534 		del_lkb(r, lkb);
4535 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4536 		break;
4537 
4538 	case 0:
4539 		/* convert was granted on remote master */
4540 		receive_flags_reply(lkb, ms, local);
4541 		if (is_demoted(lkb))
4542 			munge_demoted(lkb);
4543 		grant_lock_pc(r, lkb, ms);
4544 		queue_cast(r, lkb, 0);
4545 		break;
4546 
4547 	default:
4548 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4549 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4550 			  le32_to_cpu(ms->m_lkid),
4551 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4552 		dlm_print_rsb(r);
4553 		dlm_print_lkb(lkb);
4554 	}
4555 }
4556 
4557 static void _receive_convert_reply(struct dlm_lkb *lkb,
4558 				   const struct dlm_message *ms, bool local)
4559 {
4560 	struct dlm_rsb *r = lkb->lkb_resource;
4561 	int error;
4562 
4563 	hold_rsb(r);
4564 	lock_rsb(r);
4565 
4566 	error = validate_message(lkb, ms);
4567 	if (error)
4568 		goto out;
4569 
4570 	error = remove_from_waiters_ms(lkb, ms, local);
4571 	if (error)
4572 		goto out;
4573 
4574 	__receive_convert_reply(r, lkb, ms, local);
4575  out:
4576 	unlock_rsb(r);
4577 	put_rsb(r);
4578 }
4579 
4580 static int receive_convert_reply(struct dlm_ls *ls,
4581 				 const struct dlm_message *ms)
4582 {
4583 	struct dlm_lkb *lkb;
4584 	int error;
4585 
4586 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4587 	if (error)
4588 		return error;
4589 
4590 	_receive_convert_reply(lkb, ms, false);
4591 	dlm_put_lkb(lkb);
4592 	return 0;
4593 }
4594 
4595 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4596 				  const struct dlm_message *ms, bool local)
4597 {
4598 	struct dlm_rsb *r = lkb->lkb_resource;
4599 	int error;
4600 
4601 	hold_rsb(r);
4602 	lock_rsb(r);
4603 
4604 	error = validate_message(lkb, ms);
4605 	if (error)
4606 		goto out;
4607 
4608 	error = remove_from_waiters_ms(lkb, ms, local);
4609 	if (error)
4610 		goto out;
4611 
4612 	/* this is the value returned from do_unlock() on the master */
4613 
4614 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4615 	case -DLM_EUNLOCK:
4616 		receive_flags_reply(lkb, ms, local);
4617 		remove_lock_pc(r, lkb);
4618 		queue_cast(r, lkb, -DLM_EUNLOCK);
4619 		break;
4620 	case -ENOENT:
4621 		break;
4622 	default:
4623 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4624 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4625 	}
4626  out:
4627 	unlock_rsb(r);
4628 	put_rsb(r);
4629 }
4630 
4631 static int receive_unlock_reply(struct dlm_ls *ls,
4632 				const struct dlm_message *ms)
4633 {
4634 	struct dlm_lkb *lkb;
4635 	int error;
4636 
4637 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4638 	if (error)
4639 		return error;
4640 
4641 	_receive_unlock_reply(lkb, ms, false);
4642 	dlm_put_lkb(lkb);
4643 	return 0;
4644 }
4645 
4646 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4647 				  const struct dlm_message *ms, bool local)
4648 {
4649 	struct dlm_rsb *r = lkb->lkb_resource;
4650 	int error;
4651 
4652 	hold_rsb(r);
4653 	lock_rsb(r);
4654 
4655 	error = validate_message(lkb, ms);
4656 	if (error)
4657 		goto out;
4658 
4659 	error = remove_from_waiters_ms(lkb, ms, local);
4660 	if (error)
4661 		goto out;
4662 
4663 	/* this is the value returned from do_cancel() on the master */
4664 
4665 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4666 	case -DLM_ECANCEL:
4667 		receive_flags_reply(lkb, ms, local);
4668 		revert_lock_pc(r, lkb);
4669 		queue_cast(r, lkb, -DLM_ECANCEL);
4670 		break;
4671 	case 0:
4672 		break;
4673 	default:
4674 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4675 			  lkb->lkb_id,
4676 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4677 	}
4678  out:
4679 	unlock_rsb(r);
4680 	put_rsb(r);
4681 }
4682 
4683 static int receive_cancel_reply(struct dlm_ls *ls,
4684 				const struct dlm_message *ms)
4685 {
4686 	struct dlm_lkb *lkb;
4687 	int error;
4688 
4689 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4690 	if (error)
4691 		return error;
4692 
4693 	_receive_cancel_reply(lkb, ms, false);
4694 	dlm_put_lkb(lkb);
4695 	return 0;
4696 }
4697 
4698 static void receive_lookup_reply(struct dlm_ls *ls,
4699 				 const struct dlm_message *ms)
4700 {
4701 	struct dlm_lkb *lkb;
4702 	struct dlm_rsb *r;
4703 	int error, ret_nodeid;
4704 	int do_lookup_list = 0;
4705 
4706 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4707 	if (error) {
4708 		log_error(ls, "%s no lkid %x", __func__,
4709 			  le32_to_cpu(ms->m_lkid));
4710 		return;
4711 	}
4712 
4713 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4714 	   FIXME: will a non-zero error ever be returned? */
4715 
4716 	r = lkb->lkb_resource;
4717 	hold_rsb(r);
4718 	lock_rsb(r);
4719 
4720 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4721 	if (error)
4722 		goto out;
4723 
4724 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4725 
4726 	/* We sometimes receive a request from the dir node for this
4727 	   rsb before we've received the dir node's loookup_reply for it.
4728 	   The request from the dir node implies we're the master, so we set
4729 	   ourself as master in receive_request_reply, and verify here that
4730 	   we are indeed the master. */
4731 
4732 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4733 		/* This should never happen */
4734 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4735 			  "master %d dir %d our %d first %x %s",
4736 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4737 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4738 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4739 	}
4740 
4741 	if (ret_nodeid == dlm_our_nodeid()) {
4742 		r->res_master_nodeid = ret_nodeid;
4743 		r->res_nodeid = 0;
4744 		do_lookup_list = 1;
4745 		r->res_first_lkid = 0;
4746 	} else if (ret_nodeid == -1) {
4747 		/* the remote node doesn't believe it's the dir node */
4748 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4749 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4750 		r->res_master_nodeid = 0;
4751 		r->res_nodeid = -1;
4752 		lkb->lkb_nodeid = -1;
4753 	} else {
4754 		/* set_master() will set lkb_nodeid from r */
4755 		r->res_master_nodeid = ret_nodeid;
4756 		r->res_nodeid = ret_nodeid;
4757 	}
4758 
4759 	if (is_overlap(lkb)) {
4760 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4761 			  lkb->lkb_id, dlm_iflags_val(lkb));
4762 		queue_cast_overlap(r, lkb);
4763 		unhold_lkb(lkb); /* undoes create_lkb() */
4764 		goto out_list;
4765 	}
4766 
4767 	_request_lock(r, lkb);
4768 
4769  out_list:
4770 	if (do_lookup_list)
4771 		process_lookup_list(r);
4772  out:
4773 	unlock_rsb(r);
4774 	put_rsb(r);
4775 	dlm_put_lkb(lkb);
4776 }
4777 
4778 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4779 			     uint32_t saved_seq)
4780 {
4781 	int error = 0, noent = 0;
4782 
4783 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4784 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4785 			  le32_to_cpu(ms->m_type),
4786 			  le32_to_cpu(ms->m_header.h_nodeid),
4787 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4788 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4789 		return;
4790 	}
4791 
4792 	switch (ms->m_type) {
4793 
4794 	/* messages sent to a master node */
4795 
4796 	case cpu_to_le32(DLM_MSG_REQUEST):
4797 		error = receive_request(ls, ms);
4798 		break;
4799 
4800 	case cpu_to_le32(DLM_MSG_CONVERT):
4801 		error = receive_convert(ls, ms);
4802 		break;
4803 
4804 	case cpu_to_le32(DLM_MSG_UNLOCK):
4805 		error = receive_unlock(ls, ms);
4806 		break;
4807 
4808 	case cpu_to_le32(DLM_MSG_CANCEL):
4809 		noent = 1;
4810 		error = receive_cancel(ls, ms);
4811 		break;
4812 
4813 	/* messages sent from a master node (replies to above) */
4814 
4815 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4816 		error = receive_request_reply(ls, ms);
4817 		break;
4818 
4819 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4820 		error = receive_convert_reply(ls, ms);
4821 		break;
4822 
4823 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4824 		error = receive_unlock_reply(ls, ms);
4825 		break;
4826 
4827 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4828 		error = receive_cancel_reply(ls, ms);
4829 		break;
4830 
4831 	/* messages sent from a master node (only two types of async msg) */
4832 
4833 	case cpu_to_le32(DLM_MSG_GRANT):
4834 		noent = 1;
4835 		error = receive_grant(ls, ms);
4836 		break;
4837 
4838 	case cpu_to_le32(DLM_MSG_BAST):
4839 		noent = 1;
4840 		error = receive_bast(ls, ms);
4841 		break;
4842 
4843 	/* messages sent to a dir node */
4844 
4845 	case cpu_to_le32(DLM_MSG_LOOKUP):
4846 		receive_lookup(ls, ms);
4847 		break;
4848 
4849 	case cpu_to_le32(DLM_MSG_REMOVE):
4850 		receive_remove(ls, ms);
4851 		break;
4852 
4853 	/* messages sent from a dir node (remove has no reply) */
4854 
4855 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4856 		receive_lookup_reply(ls, ms);
4857 		break;
4858 
4859 	/* other messages */
4860 
4861 	case cpu_to_le32(DLM_MSG_PURGE):
4862 		receive_purge(ls, ms);
4863 		break;
4864 
4865 	default:
4866 		log_error(ls, "unknown message type %d",
4867 			  le32_to_cpu(ms->m_type));
4868 	}
4869 
4870 	/*
4871 	 * When checking for ENOENT, we're checking the result of
4872 	 * find_lkb(m_remid):
4873 	 *
4874 	 * The lock id referenced in the message wasn't found.  This may
4875 	 * happen in normal usage for the async messages and cancel, so
4876 	 * only use log_debug for them.
4877 	 *
4878 	 * Some errors are expected and normal.
4879 	 */
4880 
4881 	if (error == -ENOENT && noent) {
4882 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4883 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4884 			  le32_to_cpu(ms->m_header.h_nodeid),
4885 			  le32_to_cpu(ms->m_lkid), saved_seq);
4886 	} else if (error == -ENOENT) {
4887 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4888 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4889 			  le32_to_cpu(ms->m_header.h_nodeid),
4890 			  le32_to_cpu(ms->m_lkid), saved_seq);
4891 
4892 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4893 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4894 	}
4895 
4896 	if (error == -EINVAL) {
4897 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4898 			  "saved_seq %u",
4899 			  le32_to_cpu(ms->m_type),
4900 			  le32_to_cpu(ms->m_header.h_nodeid),
4901 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4902 			  saved_seq);
4903 	}
4904 }
4905 
4906 /* If the lockspace is in recovery mode (locking stopped), then normal
4907    messages are saved on the requestqueue for processing after recovery is
4908    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4909    messages off the requestqueue before we process new ones. This occurs right
4910    after recovery completes when we transition from saving all messages on
4911    requestqueue, to processing all the saved messages, to processing new
4912    messages as they arrive. */
4913 
4914 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4915 				int nodeid)
4916 {
4917 try_again:
4918 	read_lock_bh(&ls->ls_requestqueue_lock);
4919 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4920 		/* If we were a member of this lockspace, left, and rejoined,
4921 		   other nodes may still be sending us messages from the
4922 		   lockspace generation before we left. */
4923 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4924 			read_unlock_bh(&ls->ls_requestqueue_lock);
4925 			log_limit(ls, "receive %d from %d ignore old gen",
4926 				  le32_to_cpu(ms->m_type), nodeid);
4927 			return;
4928 		}
4929 
4930 		read_unlock_bh(&ls->ls_requestqueue_lock);
4931 		write_lock_bh(&ls->ls_requestqueue_lock);
4932 		/* recheck because we hold writelock now */
4933 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4934 			write_unlock_bh(&ls->ls_requestqueue_lock);
4935 			goto try_again;
4936 		}
4937 
4938 		dlm_add_requestqueue(ls, nodeid, ms);
4939 		write_unlock_bh(&ls->ls_requestqueue_lock);
4940 	} else {
4941 		_receive_message(ls, ms, 0);
4942 		read_unlock_bh(&ls->ls_requestqueue_lock);
4943 	}
4944 }
4945 
4946 /* This is called by dlm_recoverd to process messages that were saved on
4947    the requestqueue. */
4948 
4949 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4950 			       uint32_t saved_seq)
4951 {
4952 	_receive_message(ls, ms, saved_seq);
4953 }
4954 
4955 /* This is called by the midcomms layer when something is received for
4956    the lockspace.  It could be either a MSG (normal message sent as part of
4957    standard locking activity) or an RCOM (recovery message sent as part of
4958    lockspace recovery). */
4959 
4960 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4961 {
4962 	const struct dlm_header *hd = &p->header;
4963 	struct dlm_ls *ls;
4964 	int type = 0;
4965 
4966 	switch (hd->h_cmd) {
4967 	case DLM_MSG:
4968 		type = le32_to_cpu(p->message.m_type);
4969 		break;
4970 	case DLM_RCOM:
4971 		type = le32_to_cpu(p->rcom.rc_type);
4972 		break;
4973 	default:
4974 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4975 		return;
4976 	}
4977 
4978 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4979 		log_print("invalid h_nodeid %d from %d lockspace %x",
4980 			  le32_to_cpu(hd->h_nodeid), nodeid,
4981 			  le32_to_cpu(hd->u.h_lockspace));
4982 		return;
4983 	}
4984 
4985 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4986 	if (!ls) {
4987 		if (dlm_config.ci_log_debug) {
4988 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4989 				"%u from %d cmd %d type %d\n",
4990 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4991 				hd->h_cmd, type);
4992 		}
4993 
4994 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4995 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4996 		return;
4997 	}
4998 
4999 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5000 	   be inactive (in this ls) before transitioning to recovery mode */
5001 
5002 	read_lock_bh(&ls->ls_recv_active);
5003 	if (hd->h_cmd == DLM_MSG)
5004 		dlm_receive_message(ls, &p->message, nodeid);
5005 	else if (hd->h_cmd == DLM_RCOM)
5006 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5007 	else
5008 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5009 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5010 	read_unlock_bh(&ls->ls_recv_active);
5011 
5012 	dlm_put_lockspace(ls);
5013 }
5014 
5015 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5016 				   struct dlm_message *ms_local)
5017 {
5018 	if (middle_conversion(lkb) || lkb->lkb_rqmode >= lkb->lkb_grmode)
5019 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5020 
5021 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5022 	   conversions are async; there's no reply from the remote master */
5023 }
5024 
5025 /* A waiting lkb needs recovery if the master node has failed, or
5026    the master node is changing (only when no directory is used) */
5027 
5028 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5029 				 int dir_nodeid)
5030 {
5031 	if (dlm_no_directory(ls))
5032 		return 1;
5033 
5034 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5035 		return 1;
5036 
5037 	return 0;
5038 }
5039 
5040 /* Recovery for locks that are waiting for replies from nodes that are now
5041    gone.  We can just complete unlocks and cancels by faking a reply from the
5042    dead node.  Requests and up-conversions we flag to be resent after
5043    recovery.  Down-conversions can just be completed with a fake reply like
5044    unlocks.  Conversions between PR and CW need special attention. */
5045 
5046 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5047 {
5048 	struct dlm_lkb *lkb, *safe;
5049 	struct dlm_message *ms_local;
5050 	int wait_type, local_unlock_result, local_cancel_result;
5051 	int dir_nodeid;
5052 
5053 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5054 	if (!ms_local)
5055 		return;
5056 
5057 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5058 
5059 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5060 
5061 		/* exclude debug messages about unlocks because there can be so
5062 		   many and they aren't very interesting */
5063 
5064 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5065 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5066 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5067 				  lkb->lkb_id,
5068 				  lkb->lkb_remid,
5069 				  lkb->lkb_wait_type,
5070 				  lkb->lkb_resource->res_nodeid,
5071 				  lkb->lkb_nodeid,
5072 				  lkb->lkb_wait_nodeid,
5073 				  dir_nodeid);
5074 		}
5075 
5076 		/* all outstanding lookups, regardless of destination  will be
5077 		   resent after recovery is done */
5078 
5079 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5080 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5081 			continue;
5082 		}
5083 
5084 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5085 			continue;
5086 
5087 		wait_type = lkb->lkb_wait_type;
5088 		local_unlock_result = -DLM_EUNLOCK;
5089 		local_cancel_result = -DLM_ECANCEL;
5090 
5091 		/* Main reply may have been received leaving a zero wait_type,
5092 		   but a reply for the overlapping op may not have been
5093 		   received.  In that case we need to fake the appropriate
5094 		   reply for the overlap op. */
5095 
5096 		if (!wait_type) {
5097 			if (is_overlap_cancel(lkb)) {
5098 				wait_type = DLM_MSG_CANCEL;
5099 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5100 					local_cancel_result = 0;
5101 			}
5102 			if (is_overlap_unlock(lkb)) {
5103 				wait_type = DLM_MSG_UNLOCK;
5104 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5105 					local_unlock_result = -ENOENT;
5106 			}
5107 
5108 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5109 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5110 				  local_cancel_result, local_unlock_result);
5111 		}
5112 
5113 		switch (wait_type) {
5114 
5115 		case DLM_MSG_REQUEST:
5116 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5117 			break;
5118 
5119 		case DLM_MSG_CONVERT:
5120 			recover_convert_waiter(ls, lkb, ms_local);
5121 			break;
5122 
5123 		case DLM_MSG_UNLOCK:
5124 			hold_lkb(lkb);
5125 			memset(ms_local, 0, sizeof(struct dlm_message));
5126 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5127 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5128 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5129 			_receive_unlock_reply(lkb, ms_local, true);
5130 			dlm_put_lkb(lkb);
5131 			break;
5132 
5133 		case DLM_MSG_CANCEL:
5134 			hold_lkb(lkb);
5135 			memset(ms_local, 0, sizeof(struct dlm_message));
5136 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5137 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5138 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5139 			_receive_cancel_reply(lkb, ms_local, true);
5140 			dlm_put_lkb(lkb);
5141 			break;
5142 
5143 		default:
5144 			log_error(ls, "invalid lkb wait_type %d %d",
5145 				  lkb->lkb_wait_type, wait_type);
5146 		}
5147 		schedule();
5148 	}
5149 	kfree(ms_local);
5150 }
5151 
5152 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5153 {
5154 	struct dlm_lkb *lkb = NULL, *iter;
5155 
5156 	spin_lock_bh(&ls->ls_waiters_lock);
5157 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5158 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5159 			hold_lkb(iter);
5160 			lkb = iter;
5161 			break;
5162 		}
5163 	}
5164 	spin_unlock_bh(&ls->ls_waiters_lock);
5165 
5166 	return lkb;
5167 }
5168 
5169 /*
5170  * Forced state reset for locks that were in the middle of remote operations
5171  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5172  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5173  * list need to be reevaluated; some may need resending to a different node
5174  * than previously, and some may now need local handling rather than remote.
5175  *
5176  * First, the lkb state for the voided remote operation is forcibly reset,
5177  * equivalent to what remove_from_waiters() would normally do:
5178  * . lkb removed from ls_waiters list
5179  * . lkb wait_type cleared
5180  * . lkb waiters_count cleared
5181  * . lkb ref count decremented for each waiters_count (almost always 1,
5182  *   but possibly 2 in case of cancel/unlock overlapping, which means
5183  *   two remote replies were being expected for the lkb.)
5184  *
5185  * Second, the lkb is reprocessed like an original operation would be,
5186  * by passing it to _request_lock or _convert_lock, which will either
5187  * process the lkb operation locally, or send it to a remote node again
5188  * and put the lkb back onto the waiters list.
5189  *
5190  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5191  * force-unlock or cancel, either from before recovery began, or after recovery
5192  * finished.  If this is the case, the unlock/cancel is done directly, and the
5193  * original operation is not initiated again (no _request_lock/_convert_lock.)
5194  */
5195 
5196 int dlm_recover_waiters_post(struct dlm_ls *ls)
5197 {
5198 	struct dlm_lkb *lkb;
5199 	struct dlm_rsb *r;
5200 	int error = 0, mstype, err, oc, ou;
5201 
5202 	while (1) {
5203 		if (dlm_locking_stopped(ls)) {
5204 			log_debug(ls, "recover_waiters_post aborted");
5205 			error = -EINTR;
5206 			break;
5207 		}
5208 
5209 		/*
5210 		 * Find an lkb from the waiters list that's been affected by
5211 		 * recovery node changes, and needs to be reprocessed.  Does
5212 		 * hold_lkb(), adding a refcount.
5213 		 */
5214 		lkb = find_resend_waiter(ls);
5215 		if (!lkb)
5216 			break;
5217 
5218 		r = lkb->lkb_resource;
5219 		hold_rsb(r);
5220 		lock_rsb(r);
5221 
5222 		/*
5223 		 * If the lkb has been flagged for a force unlock or cancel,
5224 		 * then the reprocessing below will be replaced by just doing
5225 		 * the unlock/cancel directly.
5226 		 */
5227 		mstype = lkb->lkb_wait_type;
5228 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5229 					&lkb->lkb_iflags);
5230 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5231 					&lkb->lkb_iflags);
5232 		err = 0;
5233 
5234 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5235 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5236 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5237 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5238 			  dlm_dir_nodeid(r), oc, ou);
5239 
5240 		/*
5241 		 * No reply to the pre-recovery operation will now be received,
5242 		 * so a forced equivalent of remove_from_waiters() is needed to
5243 		 * reset the waiters state that was in place before recovery.
5244 		 */
5245 
5246 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5247 
5248 		/* Forcibly clear wait_type */
5249 		lkb->lkb_wait_type = 0;
5250 
5251 		/*
5252 		 * Forcibly reset wait_count and associated refcount.  The
5253 		 * wait_count will almost always be 1, but in case of an
5254 		 * overlapping unlock/cancel it could be 2: see where
5255 		 * add_to_waiters() finds the lkb is already on the waiters
5256 		 * list and does lkb_wait_count++; hold_lkb().
5257 		 */
5258 		while (lkb->lkb_wait_count) {
5259 			lkb->lkb_wait_count--;
5260 			unhold_lkb(lkb);
5261 		}
5262 
5263 		/* Forcibly remove from waiters list */
5264 		spin_lock_bh(&ls->ls_waiters_lock);
5265 		list_del_init(&lkb->lkb_wait_reply);
5266 		spin_unlock_bh(&ls->ls_waiters_lock);
5267 
5268 		/*
5269 		 * The lkb is now clear of all prior waiters state and can be
5270 		 * processed locally, or sent to remote node again, or directly
5271 		 * cancelled/unlocked.
5272 		 */
5273 
5274 		if (oc || ou) {
5275 			/* do an unlock or cancel instead of resending */
5276 			switch (mstype) {
5277 			case DLM_MSG_LOOKUP:
5278 			case DLM_MSG_REQUEST:
5279 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5280 							-DLM_ECANCEL);
5281 				unhold_lkb(lkb); /* undoes create_lkb() */
5282 				break;
5283 			case DLM_MSG_CONVERT:
5284 				if (oc) {
5285 					queue_cast(r, lkb, -DLM_ECANCEL);
5286 				} else {
5287 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5288 					_unlock_lock(r, lkb);
5289 				}
5290 				break;
5291 			default:
5292 				err = 1;
5293 			}
5294 		} else {
5295 			switch (mstype) {
5296 			case DLM_MSG_LOOKUP:
5297 			case DLM_MSG_REQUEST:
5298 				_request_lock(r, lkb);
5299 				if (r->res_nodeid != -1 && is_master(r))
5300 					confirm_master(r, 0);
5301 				break;
5302 			case DLM_MSG_CONVERT:
5303 				_convert_lock(r, lkb);
5304 				break;
5305 			default:
5306 				err = 1;
5307 			}
5308 		}
5309 
5310 		if (err) {
5311 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5312 				  "dir_nodeid %d overlap %d %d",
5313 				  lkb->lkb_id, mstype, r->res_nodeid,
5314 				  dlm_dir_nodeid(r), oc, ou);
5315 		}
5316 		unlock_rsb(r);
5317 		put_rsb(r);
5318 		dlm_put_lkb(lkb);
5319 	}
5320 
5321 	return error;
5322 }
5323 
5324 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5325 			      struct list_head *list)
5326 {
5327 	struct dlm_lkb *lkb, *safe;
5328 
5329 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5330 		if (!is_master_copy(lkb))
5331 			continue;
5332 
5333 		/* don't purge lkbs we've added in recover_master_copy for
5334 		   the current recovery seq */
5335 
5336 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5337 			continue;
5338 
5339 		del_lkb(r, lkb);
5340 
5341 		/* this put should free the lkb */
5342 		if (!dlm_put_lkb(lkb))
5343 			log_error(ls, "purged mstcpy lkb not released");
5344 	}
5345 }
5346 
5347 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5348 {
5349 	struct dlm_ls *ls = r->res_ls;
5350 
5351 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5352 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5353 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5354 }
5355 
5356 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5357 			    struct list_head *list,
5358 			    int nodeid_gone, unsigned int *count)
5359 {
5360 	struct dlm_lkb *lkb, *safe;
5361 
5362 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5363 		if (!is_master_copy(lkb))
5364 			continue;
5365 
5366 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5367 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5368 
5369 			/* tell recover_lvb to invalidate the lvb
5370 			   because a node holding EX/PW failed */
5371 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5372 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5373 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5374 			}
5375 
5376 			del_lkb(r, lkb);
5377 
5378 			/* this put should free the lkb */
5379 			if (!dlm_put_lkb(lkb))
5380 				log_error(ls, "purged dead lkb not released");
5381 
5382 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5383 
5384 			(*count)++;
5385 		}
5386 	}
5387 }
5388 
5389 /* Get rid of locks held by nodes that are gone. */
5390 
5391 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5392 {
5393 	struct dlm_rsb *r;
5394 	struct dlm_member *memb;
5395 	int nodes_count = 0;
5396 	int nodeid_gone = 0;
5397 	unsigned int lkb_count = 0;
5398 
5399 	/* cache one removed nodeid to optimize the common
5400 	   case of a single node removed */
5401 
5402 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5403 		nodes_count++;
5404 		nodeid_gone = memb->nodeid;
5405 	}
5406 
5407 	if (!nodes_count)
5408 		return;
5409 
5410 	list_for_each_entry(r, root_list, res_root_list) {
5411 		lock_rsb(r);
5412 		if (r->res_nodeid != -1 && is_master(r)) {
5413 			purge_dead_list(ls, r, &r->res_grantqueue,
5414 					nodeid_gone, &lkb_count);
5415 			purge_dead_list(ls, r, &r->res_convertqueue,
5416 					nodeid_gone, &lkb_count);
5417 			purge_dead_list(ls, r, &r->res_waitqueue,
5418 					nodeid_gone, &lkb_count);
5419 		}
5420 		unlock_rsb(r);
5421 
5422 		cond_resched();
5423 	}
5424 
5425 	if (lkb_count)
5426 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5427 			  lkb_count, nodes_count);
5428 }
5429 
5430 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5431 {
5432 	struct dlm_rsb *r;
5433 
5434 	read_lock_bh(&ls->ls_rsbtbl_lock);
5435 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5436 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5437 			continue;
5438 		if (!is_master(r)) {
5439 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5440 			continue;
5441 		}
5442 		hold_rsb(r);
5443 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5444 		return r;
5445 	}
5446 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5447 	return NULL;
5448 }
5449 
5450 /*
5451  * Attempt to grant locks on resources that we are the master of.
5452  * Locks may have become grantable during recovery because locks
5453  * from departed nodes have been purged (or not rebuilt), allowing
5454  * previously blocked locks to now be granted.  The subset of rsb's
5455  * we are interested in are those with lkb's on either the convert or
5456  * waiting queues.
5457  *
5458  * Simplest would be to go through each master rsb and check for non-empty
5459  * convert or waiting queues, and attempt to grant on those rsbs.
5460  * Checking the queues requires lock_rsb, though, for which we'd need
5461  * to release the rsbtbl lock.  This would make iterating through all
5462  * rsb's very inefficient.  So, we rely on earlier recovery routines
5463  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5464  * locks for.
5465  */
5466 
5467 void dlm_recover_grant(struct dlm_ls *ls)
5468 {
5469 	struct dlm_rsb *r;
5470 	unsigned int count = 0;
5471 	unsigned int rsb_count = 0;
5472 	unsigned int lkb_count = 0;
5473 
5474 	while (1) {
5475 		r = find_grant_rsb(ls);
5476 		if (!r)
5477 			break;
5478 
5479 		rsb_count++;
5480 		count = 0;
5481 		lock_rsb(r);
5482 		/* the RECOVER_GRANT flag is checked in the grant path */
5483 		grant_pending_locks(r, &count);
5484 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5485 		lkb_count += count;
5486 		confirm_master(r, 0);
5487 		unlock_rsb(r);
5488 		put_rsb(r);
5489 		cond_resched();
5490 	}
5491 
5492 	if (lkb_count)
5493 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5494 			  lkb_count, rsb_count);
5495 }
5496 
5497 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5498 					 uint32_t remid)
5499 {
5500 	struct dlm_lkb *lkb;
5501 
5502 	list_for_each_entry(lkb, head, lkb_statequeue) {
5503 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5504 			return lkb;
5505 	}
5506 	return NULL;
5507 }
5508 
5509 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5510 				    uint32_t remid)
5511 {
5512 	struct dlm_lkb *lkb;
5513 
5514 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5515 	if (lkb)
5516 		return lkb;
5517 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5518 	if (lkb)
5519 		return lkb;
5520 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5521 	if (lkb)
5522 		return lkb;
5523 	return NULL;
5524 }
5525 
5526 /* needs at least dlm_rcom + rcom_lock */
5527 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5528 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5529 {
5530 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5531 
5532 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5533 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5534 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5535 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5536 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5537 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5538 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5539 	lkb->lkb_rqmode = rl->rl_rqmode;
5540 	lkb->lkb_grmode = rl->rl_grmode;
5541 	/* don't set lkb_status because add_lkb wants to itself */
5542 
5543 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5544 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5545 
5546 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5547 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5548 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5549 		if (lvblen > ls->ls_lvblen)
5550 			return -EINVAL;
5551 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5552 		if (!lkb->lkb_lvbptr)
5553 			return -ENOMEM;
5554 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5555 	}
5556 
5557 	/* Conversions between PR and CW (middle modes) need special handling.
5558 	   The real granted mode of these converting locks cannot be determined
5559 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5560 
5561 	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5562 		/* We may need to adjust grmode depending on other granted locks. */
5563 		log_rinfo(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5564 			  __func__, lkb->lkb_id, lkb->lkb_grmode,
5565 			  lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5566 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5567 	}
5568 
5569 	return 0;
5570 }
5571 
5572 /* This lkb may have been recovered in a previous aborted recovery so we need
5573    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5574    If so we just send back a standard reply.  If not, we create a new lkb with
5575    the given values and send back our lkid.  We send back our lkid by sending
5576    back the rcom_lock struct we got but with the remid field filled in. */
5577 
5578 /* needs at least dlm_rcom + rcom_lock */
5579 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5580 			    __le32 *rl_remid, __le32 *rl_result)
5581 {
5582 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5583 	struct dlm_rsb *r;
5584 	struct dlm_lkb *lkb;
5585 	uint32_t remid = 0;
5586 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5587 	int error;
5588 
5589 	/* init rl_remid with rcom lock rl_remid */
5590 	*rl_remid = rl->rl_remid;
5591 
5592 	if (rl->rl_parent_lkid) {
5593 		error = -EOPNOTSUPP;
5594 		goto out;
5595 	}
5596 
5597 	remid = le32_to_cpu(rl->rl_lkid);
5598 
5599 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5600 	   have to require it.  Recovery of masters on one node can overlap
5601 	   recovery of locks on another node, so one node can send us MSTCPY
5602 	   locks before we've made ourselves master of this rsb.  We can still
5603 	   add new MSTCPY locks that we receive here without any harm; when
5604 	   we make ourselves master, dlm_recover_masters() won't touch the
5605 	   MSTCPY locks we've received early. */
5606 
5607 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5608 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5609 	if (error)
5610 		goto out;
5611 
5612 	lock_rsb(r);
5613 
5614 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5615 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5616 			  from_nodeid, remid);
5617 		error = -EBADR;
5618 		goto out_unlock;
5619 	}
5620 
5621 	lkb = search_remid(r, from_nodeid, remid);
5622 	if (lkb) {
5623 		error = -EEXIST;
5624 		goto out_remid;
5625 	}
5626 
5627 	error = create_lkb(ls, &lkb);
5628 	if (error)
5629 		goto out_unlock;
5630 
5631 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5632 	if (error) {
5633 		__put_lkb(ls, lkb);
5634 		goto out_unlock;
5635 	}
5636 
5637 	attach_lkb(r, lkb);
5638 	add_lkb(r, lkb, rl->rl_status);
5639 	ls->ls_recover_locks_in++;
5640 
5641 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5642 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5643 
5644  out_remid:
5645 	/* this is the new value returned to the lock holder for
5646 	   saving in its process-copy lkb */
5647 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5648 
5649 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5650 
5651  out_unlock:
5652 	unlock_rsb(r);
5653 	put_rsb(r);
5654  out:
5655 	if (error && error != -EEXIST)
5656 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5657 			  from_nodeid, remid, error);
5658 	*rl_result = cpu_to_le32(error);
5659 	return error;
5660 }
5661 
5662 /* needs at least dlm_rcom + rcom_lock */
5663 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5664 			     uint64_t seq)
5665 {
5666 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5667 	struct dlm_rsb *r;
5668 	struct dlm_lkb *lkb;
5669 	uint32_t lkid, remid;
5670 	int error, result;
5671 
5672 	lkid = le32_to_cpu(rl->rl_lkid);
5673 	remid = le32_to_cpu(rl->rl_remid);
5674 	result = le32_to_cpu(rl->rl_result);
5675 
5676 	error = find_lkb(ls, lkid, &lkb);
5677 	if (error) {
5678 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5679 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5680 			  result);
5681 		return error;
5682 	}
5683 
5684 	r = lkb->lkb_resource;
5685 	hold_rsb(r);
5686 	lock_rsb(r);
5687 
5688 	if (!is_process_copy(lkb)) {
5689 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5690 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691 			  result);
5692 		dlm_dump_rsb(r);
5693 		unlock_rsb(r);
5694 		put_rsb(r);
5695 		dlm_put_lkb(lkb);
5696 		return -EINVAL;
5697 	}
5698 
5699 	switch (result) {
5700 	case -EBADR:
5701 		/* There's a chance the new master received our lock before
5702 		   dlm_recover_master_reply(), this wouldn't happen if we did
5703 		   a barrier between recover_masters and recover_locks. */
5704 
5705 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5706 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 			  result);
5708 
5709 		dlm_send_rcom_lock(r, lkb, seq);
5710 		goto out;
5711 	case -EEXIST:
5712 	case 0:
5713 		lkb->lkb_remid = remid;
5714 		break;
5715 	default:
5716 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5717 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 			  result);
5719 	}
5720 
5721 	/* an ack for dlm_recover_locks() which waits for replies from
5722 	   all the locks it sends to new masters */
5723 	dlm_recovered_lock(r);
5724  out:
5725 	unlock_rsb(r);
5726 	put_rsb(r);
5727 	dlm_put_lkb(lkb);
5728 
5729 	return 0;
5730 }
5731 
5732 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5733 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5734 {
5735 	struct dlm_lkb *lkb;
5736 	struct dlm_args args;
5737 	bool do_put = true;
5738 	int error;
5739 
5740 	dlm_lock_recovery(ls);
5741 
5742 	error = create_lkb(ls, &lkb);
5743 	if (error) {
5744 		kfree(ua);
5745 		goto out;
5746 	}
5747 
5748 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5749 
5750 	if (flags & DLM_LKF_VALBLK) {
5751 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5752 		if (!ua->lksb.sb_lvbptr) {
5753 			kfree(ua);
5754 			error = -ENOMEM;
5755 			goto out_put;
5756 		}
5757 	}
5758 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5759 			      fake_bastfn, &args);
5760 	if (error) {
5761 		kfree(ua->lksb.sb_lvbptr);
5762 		ua->lksb.sb_lvbptr = NULL;
5763 		kfree(ua);
5764 		goto out_put;
5765 	}
5766 
5767 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5768 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5769 	   lock and that lkb_astparam is the dlm_user_args structure. */
5770 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5771 	error = request_lock(ls, lkb, name, namelen, &args);
5772 
5773 	switch (error) {
5774 	case 0:
5775 		break;
5776 	case -EINPROGRESS:
5777 		error = 0;
5778 		break;
5779 	case -EAGAIN:
5780 		error = 0;
5781 		fallthrough;
5782 	default:
5783 		goto out_put;
5784 	}
5785 
5786 	/* add this new lkb to the per-process list of locks */
5787 	spin_lock_bh(&ua->proc->locks_spin);
5788 	hold_lkb(lkb);
5789 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5790 	spin_unlock_bh(&ua->proc->locks_spin);
5791 	do_put = false;
5792  out_put:
5793 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5794 	if (do_put)
5795 		__put_lkb(ls, lkb);
5796  out:
5797 	dlm_unlock_recovery(ls);
5798 	return error;
5799 }
5800 
5801 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5802 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5803 {
5804 	struct dlm_lkb *lkb;
5805 	struct dlm_args args;
5806 	struct dlm_user_args *ua;
5807 	int error;
5808 
5809 	dlm_lock_recovery(ls);
5810 
5811 	error = find_lkb(ls, lkid, &lkb);
5812 	if (error)
5813 		goto out;
5814 
5815 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5816 
5817 	/* user can change the params on its lock when it converts it, or
5818 	   add an lvb that didn't exist before */
5819 
5820 	ua = lkb->lkb_ua;
5821 
5822 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5823 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5824 		if (!ua->lksb.sb_lvbptr) {
5825 			error = -ENOMEM;
5826 			goto out_put;
5827 		}
5828 	}
5829 	if (lvb_in && ua->lksb.sb_lvbptr)
5830 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5831 
5832 	ua->xid = ua_tmp->xid;
5833 	ua->castparam = ua_tmp->castparam;
5834 	ua->castaddr = ua_tmp->castaddr;
5835 	ua->bastparam = ua_tmp->bastparam;
5836 	ua->bastaddr = ua_tmp->bastaddr;
5837 	ua->user_lksb = ua_tmp->user_lksb;
5838 
5839 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5840 			      fake_bastfn, &args);
5841 	if (error)
5842 		goto out_put;
5843 
5844 	error = convert_lock(ls, lkb, &args);
5845 
5846 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5847 		error = 0;
5848  out_put:
5849 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5850 	dlm_put_lkb(lkb);
5851  out:
5852 	dlm_unlock_recovery(ls);
5853 	kfree(ua_tmp);
5854 	return error;
5855 }
5856 
5857 /*
5858  * The caller asks for an orphan lock on a given resource with a given mode.
5859  * If a matching lock exists, it's moved to the owner's list of locks and
5860  * the lkid is returned.
5861  */
5862 
5863 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5864 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5865 		     uint32_t *lkid)
5866 {
5867 	struct dlm_lkb *lkb = NULL, *iter;
5868 	struct dlm_user_args *ua;
5869 	int found_other_mode = 0;
5870 	int rv = 0;
5871 
5872 	spin_lock_bh(&ls->ls_orphans_lock);
5873 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5874 		if (iter->lkb_resource->res_length != namelen)
5875 			continue;
5876 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5877 			continue;
5878 		if (iter->lkb_grmode != mode) {
5879 			found_other_mode = 1;
5880 			continue;
5881 		}
5882 
5883 		lkb = iter;
5884 		list_del_init(&iter->lkb_ownqueue);
5885 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5886 		*lkid = iter->lkb_id;
5887 		break;
5888 	}
5889 	spin_unlock_bh(&ls->ls_orphans_lock);
5890 
5891 	if (!lkb && found_other_mode) {
5892 		rv = -EAGAIN;
5893 		goto out;
5894 	}
5895 
5896 	if (!lkb) {
5897 		rv = -ENOENT;
5898 		goto out;
5899 	}
5900 
5901 	lkb->lkb_exflags = flags;
5902 	lkb->lkb_ownpid = (int) current->pid;
5903 
5904 	ua = lkb->lkb_ua;
5905 
5906 	ua->proc = ua_tmp->proc;
5907 	ua->xid = ua_tmp->xid;
5908 	ua->castparam = ua_tmp->castparam;
5909 	ua->castaddr = ua_tmp->castaddr;
5910 	ua->bastparam = ua_tmp->bastparam;
5911 	ua->bastaddr = ua_tmp->bastaddr;
5912 	ua->user_lksb = ua_tmp->user_lksb;
5913 
5914 	/*
5915 	 * The lkb reference from the ls_orphans list was not
5916 	 * removed above, and is now considered the reference
5917 	 * for the proc locks list.
5918 	 */
5919 
5920 	spin_lock_bh(&ua->proc->locks_spin);
5921 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5922 	spin_unlock_bh(&ua->proc->locks_spin);
5923  out:
5924 	kfree(ua_tmp);
5925 	return rv;
5926 }
5927 
5928 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5930 {
5931 	struct dlm_lkb *lkb;
5932 	struct dlm_args args;
5933 	struct dlm_user_args *ua;
5934 	int error;
5935 
5936 	dlm_lock_recovery(ls);
5937 
5938 	error = find_lkb(ls, lkid, &lkb);
5939 	if (error)
5940 		goto out;
5941 
5942 	trace_dlm_unlock_start(ls, lkb, flags);
5943 
5944 	ua = lkb->lkb_ua;
5945 
5946 	if (lvb_in && ua->lksb.sb_lvbptr)
5947 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5948 	if (ua_tmp->castparam)
5949 		ua->castparam = ua_tmp->castparam;
5950 	ua->user_lksb = ua_tmp->user_lksb;
5951 
5952 	error = set_unlock_args(flags, ua, &args);
5953 	if (error)
5954 		goto out_put;
5955 
5956 	error = unlock_lock(ls, lkb, &args);
5957 
5958 	if (error == -DLM_EUNLOCK)
5959 		error = 0;
5960 	/* from validate_unlock_args() */
5961 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5962 		error = 0;
5963 	if (error)
5964 		goto out_put;
5965 
5966 	spin_lock_bh(&ua->proc->locks_spin);
5967 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5968 	if (!list_empty(&lkb->lkb_ownqueue))
5969 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5970 	spin_unlock_bh(&ua->proc->locks_spin);
5971  out_put:
5972 	trace_dlm_unlock_end(ls, lkb, flags, error);
5973 	dlm_put_lkb(lkb);
5974  out:
5975 	dlm_unlock_recovery(ls);
5976 	kfree(ua_tmp);
5977 	return error;
5978 }
5979 
5980 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5981 		    uint32_t flags, uint32_t lkid)
5982 {
5983 	struct dlm_lkb *lkb;
5984 	struct dlm_args args;
5985 	struct dlm_user_args *ua;
5986 	int error;
5987 
5988 	dlm_lock_recovery(ls);
5989 
5990 	error = find_lkb(ls, lkid, &lkb);
5991 	if (error)
5992 		goto out;
5993 
5994 	trace_dlm_unlock_start(ls, lkb, flags);
5995 
5996 	ua = lkb->lkb_ua;
5997 	if (ua_tmp->castparam)
5998 		ua->castparam = ua_tmp->castparam;
5999 	ua->user_lksb = ua_tmp->user_lksb;
6000 
6001 	error = set_unlock_args(flags, ua, &args);
6002 	if (error)
6003 		goto out_put;
6004 
6005 	error = cancel_lock(ls, lkb, &args);
6006 
6007 	if (error == -DLM_ECANCEL)
6008 		error = 0;
6009 	/* from validate_unlock_args() */
6010 	if (error == -EBUSY)
6011 		error = 0;
6012  out_put:
6013 	trace_dlm_unlock_end(ls, lkb, flags, error);
6014 	dlm_put_lkb(lkb);
6015  out:
6016 	dlm_unlock_recovery(ls);
6017 	kfree(ua_tmp);
6018 	return error;
6019 }
6020 
6021 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6022 {
6023 	struct dlm_lkb *lkb;
6024 	struct dlm_args args;
6025 	struct dlm_user_args *ua;
6026 	struct dlm_rsb *r;
6027 	int error;
6028 
6029 	dlm_lock_recovery(ls);
6030 
6031 	error = find_lkb(ls, lkid, &lkb);
6032 	if (error)
6033 		goto out;
6034 
6035 	trace_dlm_unlock_start(ls, lkb, flags);
6036 
6037 	ua = lkb->lkb_ua;
6038 
6039 	error = set_unlock_args(flags, ua, &args);
6040 	if (error)
6041 		goto out_put;
6042 
6043 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6044 
6045 	r = lkb->lkb_resource;
6046 	hold_rsb(r);
6047 	lock_rsb(r);
6048 
6049 	error = validate_unlock_args(lkb, &args);
6050 	if (error)
6051 		goto out_r;
6052 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6053 
6054 	error = _cancel_lock(r, lkb);
6055  out_r:
6056 	unlock_rsb(r);
6057 	put_rsb(r);
6058 
6059 	if (error == -DLM_ECANCEL)
6060 		error = 0;
6061 	/* from validate_unlock_args() */
6062 	if (error == -EBUSY)
6063 		error = 0;
6064  out_put:
6065 	trace_dlm_unlock_end(ls, lkb, flags, error);
6066 	dlm_put_lkb(lkb);
6067  out:
6068 	dlm_unlock_recovery(ls);
6069 	return error;
6070 }
6071 
6072 /* lkb's that are removed from the waiters list by revert are just left on the
6073    orphans list with the granted orphan locks, to be freed by purge */
6074 
6075 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6076 {
6077 	struct dlm_args args;
6078 	int error;
6079 
6080 	hold_lkb(lkb); /* reference for the ls_orphans list */
6081 	spin_lock_bh(&ls->ls_orphans_lock);
6082 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6083 	spin_unlock_bh(&ls->ls_orphans_lock);
6084 
6085 	set_unlock_args(0, lkb->lkb_ua, &args);
6086 
6087 	error = cancel_lock(ls, lkb, &args);
6088 	if (error == -DLM_ECANCEL)
6089 		error = 0;
6090 	return error;
6091 }
6092 
6093 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6094    granted.  Regardless of what rsb queue the lock is on, it's removed and
6095    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6096    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6097 
6098 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6099 {
6100 	struct dlm_args args;
6101 	int error;
6102 
6103 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6104 			lkb->lkb_ua, &args);
6105 
6106 	error = unlock_lock(ls, lkb, &args);
6107 	if (error == -DLM_EUNLOCK)
6108 		error = 0;
6109 	return error;
6110 }
6111 
6112 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6113    (which does lock_rsb) due to deadlock with receiving a message that does
6114    lock_rsb followed by dlm_user_add_cb() */
6115 
6116 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6117 				     struct dlm_user_proc *proc)
6118 {
6119 	struct dlm_lkb *lkb = NULL;
6120 
6121 	spin_lock_bh(&ls->ls_clear_proc_locks);
6122 	if (list_empty(&proc->locks))
6123 		goto out;
6124 
6125 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6126 	list_del_init(&lkb->lkb_ownqueue);
6127 
6128 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6129 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6130 	else
6131 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6132  out:
6133 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6134 	return lkb;
6135 }
6136 
6137 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6138    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6139    which we clear here. */
6140 
6141 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6142    list, and no more device_writes should add lkb's to proc->locks list; so we
6143    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6144    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6145    them ourself. */
6146 
6147 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6148 {
6149 	struct dlm_callback *cb, *cb_safe;
6150 	struct dlm_lkb *lkb, *safe;
6151 
6152 	dlm_lock_recovery(ls);
6153 
6154 	while (1) {
6155 		lkb = del_proc_lock(ls, proc);
6156 		if (!lkb)
6157 			break;
6158 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6159 			orphan_proc_lock(ls, lkb);
6160 		else
6161 			unlock_proc_lock(ls, lkb);
6162 
6163 		/* this removes the reference for the proc->locks list
6164 		   added by dlm_user_request, it may result in the lkb
6165 		   being freed */
6166 
6167 		dlm_put_lkb(lkb);
6168 	}
6169 
6170 	spin_lock_bh(&ls->ls_clear_proc_locks);
6171 
6172 	/* in-progress unlocks */
6173 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6174 		list_del_init(&lkb->lkb_ownqueue);
6175 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6176 		dlm_put_lkb(lkb);
6177 	}
6178 
6179 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6180 		list_del(&cb->list);
6181 		dlm_free_cb(cb);
6182 	}
6183 
6184 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6185 	dlm_unlock_recovery(ls);
6186 }
6187 
6188 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6189 {
6190 	struct dlm_callback *cb, *cb_safe;
6191 	struct dlm_lkb *lkb, *safe;
6192 
6193 	while (1) {
6194 		lkb = NULL;
6195 		spin_lock_bh(&proc->locks_spin);
6196 		if (!list_empty(&proc->locks)) {
6197 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6198 					 lkb_ownqueue);
6199 			list_del_init(&lkb->lkb_ownqueue);
6200 		}
6201 		spin_unlock_bh(&proc->locks_spin);
6202 
6203 		if (!lkb)
6204 			break;
6205 
6206 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6207 		unlock_proc_lock(ls, lkb);
6208 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6209 	}
6210 
6211 	spin_lock_bh(&proc->locks_spin);
6212 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213 		list_del_init(&lkb->lkb_ownqueue);
6214 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6215 		dlm_put_lkb(lkb);
6216 	}
6217 	spin_unlock_bh(&proc->locks_spin);
6218 
6219 	spin_lock_bh(&proc->asts_spin);
6220 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6221 		list_del(&cb->list);
6222 		dlm_free_cb(cb);
6223 	}
6224 	spin_unlock_bh(&proc->asts_spin);
6225 }
6226 
6227 /* pid of 0 means purge all orphans */
6228 
6229 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6230 {
6231 	struct dlm_lkb *lkb, *safe;
6232 
6233 	spin_lock_bh(&ls->ls_orphans_lock);
6234 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6235 		if (pid && lkb->lkb_ownpid != pid)
6236 			continue;
6237 		unlock_proc_lock(ls, lkb);
6238 		list_del_init(&lkb->lkb_ownqueue);
6239 		dlm_put_lkb(lkb);
6240 	}
6241 	spin_unlock_bh(&ls->ls_orphans_lock);
6242 }
6243 
6244 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6245 {
6246 	struct dlm_message *ms;
6247 	struct dlm_mhandle *mh;
6248 	int error;
6249 
6250 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6251 				DLM_MSG_PURGE, &ms, &mh);
6252 	if (error)
6253 		return error;
6254 	ms->m_nodeid = cpu_to_le32(nodeid);
6255 	ms->m_pid = cpu_to_le32(pid);
6256 
6257 	return send_message(mh, ms, NULL, 0);
6258 }
6259 
6260 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6261 		   int nodeid, int pid)
6262 {
6263 	int error = 0;
6264 
6265 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6266 		error = send_purge(ls, nodeid, pid);
6267 	} else {
6268 		dlm_lock_recovery(ls);
6269 		if (pid == current->pid)
6270 			purge_proc_locks(ls, proc);
6271 		else
6272 			do_purge(ls, nodeid, pid);
6273 		dlm_unlock_recovery(ls);
6274 	}
6275 	return error;
6276 }
6277 
6278 /* debug functionality */
6279 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6280 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6281 {
6282 	struct dlm_lksb *lksb;
6283 	struct dlm_lkb *lkb;
6284 	struct dlm_rsb *r;
6285 	int error;
6286 
6287 	/* we currently can't set a valid user lock */
6288 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6289 		return -EOPNOTSUPP;
6290 
6291 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6292 	if (!lksb)
6293 		return -ENOMEM;
6294 
6295 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6296 	if (error) {
6297 		kfree(lksb);
6298 		return error;
6299 	}
6300 
6301 	dlm_set_dflags_val(lkb, lkb_dflags);
6302 	lkb->lkb_nodeid = lkb_nodeid;
6303 	lkb->lkb_lksb = lksb;
6304 	/* user specific pointer, just don't have it NULL for kernel locks */
6305 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6306 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6307 
6308 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6309 	if (error) {
6310 		kfree(lksb);
6311 		__put_lkb(ls, lkb);
6312 		return error;
6313 	}
6314 
6315 	lock_rsb(r);
6316 	attach_lkb(r, lkb);
6317 	add_lkb(r, lkb, lkb_status);
6318 	unlock_rsb(r);
6319 	put_rsb(r);
6320 
6321 	return 0;
6322 }
6323 
6324 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6325 				 int mstype, int to_nodeid)
6326 {
6327 	struct dlm_lkb *lkb;
6328 	int error;
6329 
6330 	error = find_lkb(ls, lkb_id, &lkb);
6331 	if (error)
6332 		return error;
6333 
6334 	add_to_waiters(lkb, mstype, to_nodeid);
6335 	dlm_put_lkb(lkb);
6336 	return 0;
6337 }
6338 
6339