xref: /linux/fs/dlm/lock.c (revision 0de1e84263a2cc47fc642bcccee589a1e5220792)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /******************************************************************************
3  *******************************************************************************
4  **
5  **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6  **
7  **
8  *******************************************************************************
9  ******************************************************************************/
10  
11  /* Central locking logic has four stages:
12  
13     dlm_lock()
14     dlm_unlock()
15  
16     request_lock(ls, lkb)
17     convert_lock(ls, lkb)
18     unlock_lock(ls, lkb)
19     cancel_lock(ls, lkb)
20  
21     _request_lock(r, lkb)
22     _convert_lock(r, lkb)
23     _unlock_lock(r, lkb)
24     _cancel_lock(r, lkb)
25  
26     do_request(r, lkb)
27     do_convert(r, lkb)
28     do_unlock(r, lkb)
29     do_cancel(r, lkb)
30  
31     Stage 1 (lock, unlock) is mainly about checking input args and
32     splitting into one of the four main operations:
33  
34         dlm_lock          = request_lock
35         dlm_lock+CONVERT  = convert_lock
36         dlm_unlock        = unlock_lock
37         dlm_unlock+CANCEL = cancel_lock
38  
39     Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40     provided to the next stage.
41  
42     Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43     When remote, it calls send_xxxx(), when local it calls do_xxxx().
44  
45     Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46     given rsb and lkb and queues callbacks.
47  
48     For remote operations, send_xxxx() results in the corresponding do_xxxx()
49     function being executed on the remote node.  The connecting send/receive
50     calls on local (L) and remote (R) nodes:
51  
52     L: send_xxxx()              ->  R: receive_xxxx()
53                                     R: do_xxxx()
54     L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55  */
56  #include <trace/events/dlm.h>
57  
58  #include <linux/types.h>
59  #include <linux/rbtree.h>
60  #include <linux/slab.h>
61  #include "dlm_internal.h"
62  #include <linux/dlm_device.h>
63  #include "memory.h"
64  #include "midcomms.h"
65  #include "requestqueue.h"
66  #include "util.h"
67  #include "dir.h"
68  #include "member.h"
69  #include "lockspace.h"
70  #include "ast.h"
71  #include "lock.h"
72  #include "rcom.h"
73  #include "recover.h"
74  #include "lvb_table.h"
75  #include "user.h"
76  #include "config.h"
77  
78  static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79  static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80  static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81  static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82  static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83  static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84  static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85  static int send_remove(struct dlm_rsb *r);
86  static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87  static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88  static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89  				    const struct dlm_message *ms, bool local);
90  static int receive_extralen(const struct dlm_message *ms);
91  static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92  static void deactivate_rsb(struct kref *kref);
93  
94  /*
95   * Lock compatibilty matrix - thanks Steve
96   * UN = Unlocked state. Not really a state, used as a flag
97   * PD = Padding. Used to make the matrix a nice power of two in size
98   * Other states are the same as the VMS DLM.
99   * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100   */
101  
102  static const int __dlm_compat_matrix[8][8] = {
103        /* UN NL CR CW PR PW EX PD */
104          {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105          {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106          {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107          {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108          {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109          {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110          {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111          {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112  };
113  
114  /*
115   * This defines the direction of transfer of LVB data.
116   * Granted mode is the row; requested mode is the column.
117   * Usage: matrix[grmode+1][rqmode+1]
118   * 1 = LVB is returned to the caller
119   * 0 = LVB is written to the resource
120   * -1 = nothing happens to the LVB
121   */
122  
123  const int dlm_lvb_operations[8][8] = {
124          /* UN   NL  CR  CW  PR  PW  EX  PD*/
125          {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126          {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127          {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128          {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129          {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130          {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131          {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132          {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133  };
134  
135  #define modes_compat(gr, rq) \
136  	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137  
dlm_modes_compat(int mode1,int mode2)138  int dlm_modes_compat(int mode1, int mode2)
139  {
140  	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141  }
142  
143  /*
144   * Compatibility matrix for conversions with QUECVT set.
145   * Granted mode is the row; requested mode is the column.
146   * Usage: matrix[grmode+1][rqmode+1]
147   */
148  
149  static const int __quecvt_compat_matrix[8][8] = {
150        /* UN NL CR CW PR PW EX PD */
151          {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152          {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153          {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154          {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155          {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156          {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157          {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158          {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159  };
160  
dlm_print_lkb(struct dlm_lkb * lkb)161  void dlm_print_lkb(struct dlm_lkb *lkb)
162  {
163  	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164  	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165  	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166  	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167  	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168  	       (unsigned long long)lkb->lkb_recover_seq);
169  }
170  
dlm_print_rsb(struct dlm_rsb * r)171  static void dlm_print_rsb(struct dlm_rsb *r)
172  {
173  	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174  	       "rlc %d name %s\n",
175  	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176  	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177  	       r->res_name);
178  }
179  
dlm_dump_rsb(struct dlm_rsb * r)180  void dlm_dump_rsb(struct dlm_rsb *r)
181  {
182  	struct dlm_lkb *lkb;
183  
184  	dlm_print_rsb(r);
185  
186  	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187  	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188  	printk(KERN_ERR "rsb lookup list\n");
189  	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190  		dlm_print_lkb(lkb);
191  	printk(KERN_ERR "rsb grant queue:\n");
192  	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193  		dlm_print_lkb(lkb);
194  	printk(KERN_ERR "rsb convert queue:\n");
195  	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196  		dlm_print_lkb(lkb);
197  	printk(KERN_ERR "rsb wait queue:\n");
198  	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199  		dlm_print_lkb(lkb);
200  }
201  
202  /* Threads cannot use the lockspace while it's being recovered */
203  
dlm_lock_recovery(struct dlm_ls * ls)204  void dlm_lock_recovery(struct dlm_ls *ls)
205  {
206  	down_read(&ls->ls_in_recovery);
207  }
208  
dlm_unlock_recovery(struct dlm_ls * ls)209  void dlm_unlock_recovery(struct dlm_ls *ls)
210  {
211  	up_read(&ls->ls_in_recovery);
212  }
213  
dlm_lock_recovery_try(struct dlm_ls * ls)214  int dlm_lock_recovery_try(struct dlm_ls *ls)
215  {
216  	return down_read_trylock(&ls->ls_in_recovery);
217  }
218  
can_be_queued(struct dlm_lkb * lkb)219  static inline int can_be_queued(struct dlm_lkb *lkb)
220  {
221  	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222  }
223  
force_blocking_asts(struct dlm_lkb * lkb)224  static inline int force_blocking_asts(struct dlm_lkb *lkb)
225  {
226  	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227  }
228  
is_demoted(struct dlm_lkb * lkb)229  static inline int is_demoted(struct dlm_lkb *lkb)
230  {
231  	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232  }
233  
is_altmode(struct dlm_lkb * lkb)234  static inline int is_altmode(struct dlm_lkb *lkb)
235  {
236  	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237  }
238  
is_granted(struct dlm_lkb * lkb)239  static inline int is_granted(struct dlm_lkb *lkb)
240  {
241  	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242  }
243  
is_remote(struct dlm_rsb * r)244  static inline int is_remote(struct dlm_rsb *r)
245  {
246  	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247  	return !!r->res_nodeid;
248  }
249  
is_process_copy(struct dlm_lkb * lkb)250  static inline int is_process_copy(struct dlm_lkb *lkb)
251  {
252  	return lkb->lkb_nodeid &&
253  	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254  }
255  
is_master_copy(struct dlm_lkb * lkb)256  static inline int is_master_copy(struct dlm_lkb *lkb)
257  {
258  	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259  }
260  
middle_conversion(struct dlm_lkb * lkb)261  static inline int middle_conversion(struct dlm_lkb *lkb)
262  {
263  	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264  	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265  		return 1;
266  	return 0;
267  }
268  
down_conversion(struct dlm_lkb * lkb)269  static inline int down_conversion(struct dlm_lkb *lkb)
270  {
271  	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272  }
273  
is_overlap_unlock(struct dlm_lkb * lkb)274  static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275  {
276  	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277  }
278  
is_overlap_cancel(struct dlm_lkb * lkb)279  static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280  {
281  	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282  }
283  
is_overlap(struct dlm_lkb * lkb)284  static inline int is_overlap(struct dlm_lkb *lkb)
285  {
286  	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287  	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288  }
289  
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290  static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291  {
292  	if (is_master_copy(lkb))
293  		return;
294  
295  	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296  
297  	if (rv == -DLM_ECANCEL &&
298  	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299  		rv = -EDEADLK;
300  
301  	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302  }
303  
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304  static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305  {
306  	queue_cast(r, lkb,
307  		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308  }
309  
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310  static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311  {
312  	if (is_master_copy(lkb)) {
313  		send_bast(r, lkb, rqmode);
314  	} else {
315  		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316  	}
317  }
318  
319  /*
320   * Basic operations on rsb's and lkb's
321   */
322  
rsb_toss_jiffies(void)323  static inline unsigned long rsb_toss_jiffies(void)
324  {
325  	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326  }
327  
328  /* This is only called to add a reference when the code already holds
329     a valid reference to the rsb, so there's no need for locking. */
330  
hold_rsb(struct dlm_rsb * r)331  static inline void hold_rsb(struct dlm_rsb *r)
332  {
333  	/* inactive rsbs are not ref counted */
334  	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335  	kref_get(&r->res_ref);
336  }
337  
dlm_hold_rsb(struct dlm_rsb * r)338  void dlm_hold_rsb(struct dlm_rsb *r)
339  {
340  	hold_rsb(r);
341  }
342  
343  /* TODO move this to lib/refcount.c */
344  static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345  dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346  __cond_acquires(lock)
347  {
348  	if (refcount_dec_not_one(r))
349  		return false;
350  
351  	write_lock_bh(lock);
352  	if (!refcount_dec_and_test(r)) {
353  		write_unlock_bh(lock);
354  		return false;
355  	}
356  
357  	return true;
358  }
359  
360  /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361  static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362  					     void (*release)(struct kref *kref),
363  					     rwlock_t *lock)
364  {
365  	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366  		release(kref);
367  		return 1;
368  	}
369  
370  	return 0;
371  }
372  
put_rsb(struct dlm_rsb * r)373  static void put_rsb(struct dlm_rsb *r)
374  {
375  	struct dlm_ls *ls = r->res_ls;
376  	int rv;
377  
378  	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379  					&ls->ls_rsbtbl_lock);
380  	if (rv)
381  		write_unlock_bh(&ls->ls_rsbtbl_lock);
382  }
383  
dlm_put_rsb(struct dlm_rsb * r)384  void dlm_put_rsb(struct dlm_rsb *r)
385  {
386  	put_rsb(r);
387  }
388  
389  /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390   * new timers when recovery is triggered and don't run them
391   * again until a resume_scan_timer() tries it again.
392   */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393  static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394  {
395  	if (!dlm_locking_stopped(ls))
396  		mod_timer(&ls->ls_scan_timer, jiffies);
397  }
398  
399  /* This function tries to resume the timer callback if a rsb
400   * is on the scan list and no timer is pending. It might that
401   * the first entry is on currently executed as timer callback
402   * but we don't care if a timer queued up again and does
403   * nothing. Should be a rare case.
404   */
resume_scan_timer(struct dlm_ls * ls)405  void resume_scan_timer(struct dlm_ls *ls)
406  {
407  	struct dlm_rsb *r;
408  
409  	spin_lock_bh(&ls->ls_scan_lock);
410  	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411  				     res_scan_list);
412  	if (r && !timer_pending(&ls->ls_scan_timer))
413  		enable_scan_timer(ls, r->res_toss_time);
414  	spin_unlock_bh(&ls->ls_scan_lock);
415  }
416  
417  /* ls_rsbtbl_lock must be held */
418  
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419  static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420  {
421  	struct dlm_rsb *first;
422  
423  	/* active rsbs should never be on the scan list */
424  	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425  
426  	spin_lock_bh(&ls->ls_scan_lock);
427  	r->res_toss_time = 0;
428  
429  	/* if the rsb is not queued do nothing */
430  	if (list_empty(&r->res_scan_list))
431  		goto out;
432  
433  	/* get the first element before delete */
434  	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435  				 res_scan_list);
436  	list_del_init(&r->res_scan_list);
437  	/* check if the first element was the rsb we deleted */
438  	if (first == r) {
439  		/* try to get the new first element, if the list
440  		 * is empty now try to delete the timer, if we are
441  		 * too late we don't care.
442  		 *
443  		 * if the list isn't empty and a new first element got
444  		 * in place, set the new timer expire time.
445  		 */
446  		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447  						 res_scan_list);
448  		if (!first)
449  			timer_delete(&ls->ls_scan_timer);
450  		else
451  			enable_scan_timer(ls, first->res_toss_time);
452  	}
453  
454  out:
455  	spin_unlock_bh(&ls->ls_scan_lock);
456  }
457  
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458  static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459  {
460  	int our_nodeid = dlm_our_nodeid();
461  	struct dlm_rsb *first;
462  
463  	/* A dir record for a remote master rsb should never be on the scan list. */
464  	WARN_ON(!dlm_no_directory(ls) &&
465  		(r->res_master_nodeid != our_nodeid) &&
466  		(dlm_dir_nodeid(r) == our_nodeid));
467  
468  	/* An active rsb should never be on the scan list. */
469  	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470  
471  	/* An rsb should not already be on the scan list. */
472  	WARN_ON(!list_empty(&r->res_scan_list));
473  
474  	spin_lock_bh(&ls->ls_scan_lock);
475  	/* set the new rsb absolute expire time in the rsb */
476  	r->res_toss_time = rsb_toss_jiffies();
477  	if (list_empty(&ls->ls_scan_list)) {
478  		/* if the queue is empty add the element and it's
479  		 * our new expire time
480  		 */
481  		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482  		enable_scan_timer(ls, r->res_toss_time);
483  	} else {
484  		/* try to get the maybe new first element and then add
485  		 * to this rsb with the oldest expire time to the end
486  		 * of the queue. If the list was empty before this
487  		 * rsb expire time is our next expiration if it wasn't
488  		 * the now new first elemet is our new expiration time
489  		 */
490  		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491  						 res_scan_list);
492  		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493  		if (!first)
494  			enable_scan_timer(ls, r->res_toss_time);
495  		else
496  			enable_scan_timer(ls, first->res_toss_time);
497  	}
498  	spin_unlock_bh(&ls->ls_scan_lock);
499  }
500  
501  /* if we hit contention we do in 250 ms a retry to trylock.
502   * if there is any other mod_timer in between we don't care
503   * about that it expires earlier again this is only for the
504   * unlikely case nothing happened in this time.
505   */
506  #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507  
508  /* Called by lockspace scan_timer to free unused rsb's. */
509  
dlm_rsb_scan(struct timer_list * timer)510  void dlm_rsb_scan(struct timer_list *timer)
511  {
512  	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513  	int our_nodeid = dlm_our_nodeid();
514  	struct dlm_rsb *r;
515  	int rv;
516  
517  	while (1) {
518  		/* interrupting point to leave iteration when
519  		 * recovery waits for timer_delete_sync(), recovery
520  		 * will take care to delete everything in scan list.
521  		 */
522  		if (dlm_locking_stopped(ls))
523  			break;
524  
525  		rv = spin_trylock(&ls->ls_scan_lock);
526  		if (!rv) {
527  			/* rearm again try timer */
528  			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529  			break;
530  		}
531  
532  		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533  					     res_scan_list);
534  		if (!r) {
535  			/* the next add_scan will enable the timer again */
536  			spin_unlock(&ls->ls_scan_lock);
537  			break;
538  		}
539  
540  		/*
541  		 * If the first rsb is not yet expired, then stop because the
542  		 * list is sorted with nearest expiration first.
543  		 */
544  		if (time_before(jiffies, r->res_toss_time)) {
545  			/* rearm with the next rsb to expire in the future */
546  			enable_scan_timer(ls, r->res_toss_time);
547  			spin_unlock(&ls->ls_scan_lock);
548  			break;
549  		}
550  
551  		/* in find_rsb_dir/nodir there is a reverse order of this
552  		 * lock, however this is only a trylock if we hit some
553  		 * possible contention we try it again.
554  		 */
555  		rv = write_trylock(&ls->ls_rsbtbl_lock);
556  		if (!rv) {
557  			spin_unlock(&ls->ls_scan_lock);
558  			/* rearm again try timer */
559  			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560  			break;
561  		}
562  
563  		list_del(&r->res_slow_list);
564  		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565  				       dlm_rhash_rsb_params);
566  		rsb_clear_flag(r, RSB_HASHED);
567  
568  		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569  		write_unlock(&ls->ls_rsbtbl_lock);
570  
571  		list_del_init(&r->res_scan_list);
572  		spin_unlock(&ls->ls_scan_lock);
573  
574  		/* An rsb that is a dir record for a remote master rsb
575  		 * cannot be removed, and should not have a timer enabled.
576  		 */
577  		WARN_ON(!dlm_no_directory(ls) &&
578  			(r->res_master_nodeid != our_nodeid) &&
579  			(dlm_dir_nodeid(r) == our_nodeid));
580  
581  		/* We're the master of this rsb but we're not
582  		 * the directory record, so we need to tell the
583  		 * dir node to remove the dir record
584  		 */
585  		if (!dlm_no_directory(ls) &&
586  		    (r->res_master_nodeid == our_nodeid) &&
587  		    (dlm_dir_nodeid(r) != our_nodeid))
588  			send_remove(r);
589  
590  		free_inactive_rsb(r);
591  	}
592  }
593  
594  /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595     unlock any spinlocks, go back and call pre_rsb_struct again.
596     Otherwise, take an rsb off the list and return it. */
597  
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598  static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599  			  struct dlm_rsb **r_ret)
600  {
601  	struct dlm_rsb *r;
602  
603  	r = dlm_allocate_rsb();
604  	if (!r)
605  		return -ENOMEM;
606  
607  	r->res_ls = ls;
608  	r->res_length = len;
609  	memcpy(r->res_name, name, len);
610  	spin_lock_init(&r->res_lock);
611  
612  	INIT_LIST_HEAD(&r->res_lookup);
613  	INIT_LIST_HEAD(&r->res_grantqueue);
614  	INIT_LIST_HEAD(&r->res_convertqueue);
615  	INIT_LIST_HEAD(&r->res_waitqueue);
616  	INIT_LIST_HEAD(&r->res_root_list);
617  	INIT_LIST_HEAD(&r->res_scan_list);
618  	INIT_LIST_HEAD(&r->res_recover_list);
619  	INIT_LIST_HEAD(&r->res_masters_list);
620  
621  	*r_ret = r;
622  	return 0;
623  }
624  
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625  int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626  			struct dlm_rsb **r_ret)
627  {
628  	char key[DLM_RESNAME_MAXLEN] = {};
629  
630  	memcpy(key, name, len);
631  	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632  	if (*r_ret)
633  		return 0;
634  
635  	return -EBADR;
636  }
637  
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)638  static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639  {
640  	int rv;
641  
642  	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643  				    dlm_rhash_rsb_params);
644  	if (!rv)
645  		rsb_set_flag(rsb, RSB_HASHED);
646  
647  	return rv;
648  }
649  
650  /*
651   * Find rsb in rsbtbl and potentially create/add one
652   *
653   * Delaying the release of rsb's has a similar benefit to applications keeping
654   * NL locks on an rsb, but without the guarantee that the cached master value
655   * will still be valid when the rsb is reused.  Apps aren't always smart enough
656   * to keep NL locks on an rsb that they may lock again shortly; this can lead
657   * to excessive master lookups and removals if we don't delay the release.
658   *
659   * Searching for an rsb means looking through both the normal list and toss
660   * list.  When found on the toss list the rsb is moved to the normal list with
661   * ref count of 1; when found on normal list the ref count is incremented.
662   *
663   * rsb's on the keep list are being used locally and refcounted.
664   * rsb's on the toss list are not being used locally, and are not refcounted.
665   *
666   * The toss list rsb's were either
667   * - previously used locally but not any more (were on keep list, then
668   *   moved to toss list when last refcount dropped)
669   * - created and put on toss list as a directory record for a lookup
670   *   (we are the dir node for the res, but are not using the res right now,
671   *   but some other node is)
672   *
673   * The purpose of find_rsb() is to return a refcounted rsb for local use.
674   * So, if the given rsb is on the toss list, it is moved to the keep list
675   * before being returned.
676   *
677   * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678   * more refcounts exist, so the rsb is moved from the keep list to the
679   * toss list.
680   *
681   * rsb's on both keep and toss lists are used for doing a name to master
682   * lookups.  rsb's that are in use locally (and being refcounted) are on
683   * the keep list, rsb's that are not in use locally (not refcounted) and
684   * only exist for name/master lookups are on the toss list.
685   *
686   * rsb's on the toss list who's dir_nodeid is not local can have stale
687   * name/master mappings.  So, remote requests on such rsb's can potentially
688   * return with an error, which means the mapping is stale and needs to
689   * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690   * first_lkid is to keep only a single outstanding request on an rsb
691   * while that rsb has a potentially stale master.)
692   */
693  
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)694  static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695  			uint32_t hash, int dir_nodeid, int from_nodeid,
696  			unsigned int flags, struct dlm_rsb **r_ret)
697  {
698  	struct dlm_rsb *r = NULL;
699  	int our_nodeid = dlm_our_nodeid();
700  	int from_local = 0;
701  	int from_other = 0;
702  	int from_dir = 0;
703  	int create = 0;
704  	int error;
705  
706  	if (flags & R_RECEIVE_REQUEST) {
707  		if (from_nodeid == dir_nodeid)
708  			from_dir = 1;
709  		else
710  			from_other = 1;
711  	} else if (flags & R_REQUEST) {
712  		from_local = 1;
713  	}
714  
715  	/*
716  	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717  	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718  	 * we're the new master.  Our local recovery may not have set
719  	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720  	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721  	 * by resending.
722  	 *
723  	 * If someone sends us a request, we are the dir node, and we do
724  	 * not find the rsb anywhere, then recreate it.  This happens if
725  	 * someone sends us a request after we have removed/freed an rsb.
726  	 * (They sent a request instead of lookup because they are using
727  	 * an rsb taken from their scan list.)
728  	 */
729  
730  	if (from_local || from_dir ||
731  	    (from_other && (dir_nodeid == our_nodeid))) {
732  		create = 1;
733  	}
734  
735   retry:
736  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737  	if (error)
738  		goto do_new;
739  
740  	/* check if the rsb is active under read lock - likely path */
741  	read_lock_bh(&ls->ls_rsbtbl_lock);
742  	if (!rsb_flag(r, RSB_HASHED)) {
743  		read_unlock_bh(&ls->ls_rsbtbl_lock);
744  		error = -EBADR;
745  		goto do_new;
746  	}
747  
748  	/*
749  	 * rsb is active, so we can't check master_nodeid without lock_rsb.
750  	 */
751  
752  	if (rsb_flag(r, RSB_INACTIVE)) {
753  		read_unlock_bh(&ls->ls_rsbtbl_lock);
754  		goto do_inactive;
755  	}
756  
757  	kref_get(&r->res_ref);
758  	read_unlock_bh(&ls->ls_rsbtbl_lock);
759  	goto out;
760  
761  
762   do_inactive:
763  	write_lock_bh(&ls->ls_rsbtbl_lock);
764  
765  	/*
766  	 * The expectation here is that the rsb will have HASHED and
767  	 * INACTIVE flags set, and that the rsb can be moved from
768  	 * inactive back to active again.  However, between releasing
769  	 * the read lock and acquiring the write lock, this rsb could
770  	 * have been removed from rsbtbl, and had HASHED cleared, to
771  	 * be freed.  To deal with this case, we would normally need
772  	 * to repeat dlm_search_rsb_tree while holding the write lock,
773  	 * but rcu allows us to simply check the HASHED flag, because
774  	 * the rcu read lock means the rsb will not be freed yet.
775  	 * If the HASHED flag is not set, then the rsb is being freed,
776  	 * so we add a new rsb struct.  If the HASHED flag is set,
777  	 * and INACTIVE is not set, it means another thread has
778  	 * made the rsb active, as we're expecting to do here, and
779  	 * we just repeat the lookup (this will be very unlikely.)
780  	 */
781  	if (rsb_flag(r, RSB_HASHED)) {
782  		if (!rsb_flag(r, RSB_INACTIVE)) {
783  			write_unlock_bh(&ls->ls_rsbtbl_lock);
784  			goto retry;
785  		}
786  	} else {
787  		write_unlock_bh(&ls->ls_rsbtbl_lock);
788  		error = -EBADR;
789  		goto do_new;
790  	}
791  
792  	/*
793  	 * rsb found inactive (master_nodeid may be out of date unless
794  	 * we are the dir_nodeid or were the master)  No other thread
795  	 * is using this rsb because it's inactive, so we can
796  	 * look at or update res_master_nodeid without lock_rsb.
797  	 */
798  
799  	if ((r->res_master_nodeid != our_nodeid) && from_other) {
800  		/* our rsb was not master, and another node (not the dir node)
801  		   has sent us a request */
802  		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
803  			  from_nodeid, r->res_master_nodeid, dir_nodeid,
804  			  r->res_name);
805  		write_unlock_bh(&ls->ls_rsbtbl_lock);
806  		error = -ENOTBLK;
807  		goto out;
808  	}
809  
810  	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
811  		/* don't think this should ever happen */
812  		log_error(ls, "find_rsb inactive from_dir %d master %d",
813  			  from_nodeid, r->res_master_nodeid);
814  		dlm_print_rsb(r);
815  		/* fix it and go on */
816  		r->res_master_nodeid = our_nodeid;
817  		r->res_nodeid = 0;
818  		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
819  		r->res_first_lkid = 0;
820  	}
821  
822  	if (from_local && (r->res_master_nodeid != our_nodeid)) {
823  		/* Because we have held no locks on this rsb,
824  		   res_master_nodeid could have become stale. */
825  		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
826  		r->res_first_lkid = 0;
827  	}
828  
829  	/* we always deactivate scan timer for the rsb, when
830  	 * we move it out of the inactive state as rsb state
831  	 * can be changed and scan timers are only for inactive
832  	 * rsbs.
833  	 */
834  	del_scan(ls, r);
835  	list_move(&r->res_slow_list, &ls->ls_slow_active);
836  	rsb_clear_flag(r, RSB_INACTIVE);
837  	kref_init(&r->res_ref); /* ref is now used in active state */
838  	write_unlock_bh(&ls->ls_rsbtbl_lock);
839  
840  	goto out;
841  
842  
843   do_new:
844  	/*
845  	 * rsb not found
846  	 */
847  
848  	if (error == -EBADR && !create)
849  		goto out;
850  
851  	error = get_rsb_struct(ls, name, len, &r);
852  	if (WARN_ON_ONCE(error))
853  		goto out;
854  
855  	r->res_hash = hash;
856  	r->res_dir_nodeid = dir_nodeid;
857  	kref_init(&r->res_ref);
858  
859  	if (from_dir) {
860  		/* want to see how often this happens */
861  		log_debug(ls, "find_rsb new from_dir %d recreate %s",
862  			  from_nodeid, r->res_name);
863  		r->res_master_nodeid = our_nodeid;
864  		r->res_nodeid = 0;
865  		goto out_add;
866  	}
867  
868  	if (from_other && (dir_nodeid != our_nodeid)) {
869  		/* should never happen */
870  		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
871  			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
872  		dlm_free_rsb(r);
873  		r = NULL;
874  		error = -ENOTBLK;
875  		goto out;
876  	}
877  
878  	if (from_other) {
879  		log_debug(ls, "find_rsb new from_other %d dir %d %s",
880  			  from_nodeid, dir_nodeid, r->res_name);
881  	}
882  
883  	if (dir_nodeid == our_nodeid) {
884  		/* When we are the dir nodeid, we can set the master
885  		   node immediately */
886  		r->res_master_nodeid = our_nodeid;
887  		r->res_nodeid = 0;
888  	} else {
889  		/* set_master will send_lookup to dir_nodeid */
890  		r->res_master_nodeid = 0;
891  		r->res_nodeid = -1;
892  	}
893  
894   out_add:
895  
896  	write_lock_bh(&ls->ls_rsbtbl_lock);
897  	error = rsb_insert(r, &ls->ls_rsbtbl);
898  	if (error == -EEXIST) {
899  		/* somebody else was faster and it seems the
900  		 * rsb exists now, we do a whole relookup
901  		 */
902  		write_unlock_bh(&ls->ls_rsbtbl_lock);
903  		dlm_free_rsb(r);
904  		goto retry;
905  	} else if (!error) {
906  		list_add(&r->res_slow_list, &ls->ls_slow_active);
907  	}
908  	write_unlock_bh(&ls->ls_rsbtbl_lock);
909   out:
910  	*r_ret = r;
911  	return error;
912  }
913  
914  /* During recovery, other nodes can send us new MSTCPY locks (from
915     dlm_recover_locks) before we've made ourself master (in
916     dlm_recover_masters). */
917  
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)918  static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
919  			  uint32_t hash, int dir_nodeid, int from_nodeid,
920  			  unsigned int flags, struct dlm_rsb **r_ret)
921  {
922  	struct dlm_rsb *r = NULL;
923  	int our_nodeid = dlm_our_nodeid();
924  	int recover = (flags & R_RECEIVE_RECOVER);
925  	int error;
926  
927   retry:
928  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
929  	if (error)
930  		goto do_new;
931  
932  	/* check if the rsb is in active state under read lock - likely path */
933  	read_lock_bh(&ls->ls_rsbtbl_lock);
934  	if (!rsb_flag(r, RSB_HASHED)) {
935  		read_unlock_bh(&ls->ls_rsbtbl_lock);
936  		goto do_new;
937  	}
938  
939  	if (rsb_flag(r, RSB_INACTIVE)) {
940  		read_unlock_bh(&ls->ls_rsbtbl_lock);
941  		goto do_inactive;
942  	}
943  
944  	/*
945  	 * rsb is active, so we can't check master_nodeid without lock_rsb.
946  	 */
947  
948  	kref_get(&r->res_ref);
949  	read_unlock_bh(&ls->ls_rsbtbl_lock);
950  
951  	goto out;
952  
953  
954   do_inactive:
955  	write_lock_bh(&ls->ls_rsbtbl_lock);
956  
957  	/* See comment in find_rsb_dir. */
958  	if (rsb_flag(r, RSB_HASHED)) {
959  		if (!rsb_flag(r, RSB_INACTIVE)) {
960  			write_unlock_bh(&ls->ls_rsbtbl_lock);
961  			goto retry;
962  		}
963  	} else {
964  		write_unlock_bh(&ls->ls_rsbtbl_lock);
965  		goto do_new;
966  	}
967  
968  
969  	/*
970  	 * rsb found inactive. No other thread is using this rsb because
971  	 * it's inactive, so we can look at or update res_master_nodeid
972  	 * without lock_rsb.
973  	 */
974  
975  	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
976  		/* our rsb is not master, and another node has sent us a
977  		   request; this should never happen */
978  		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
979  			  from_nodeid, r->res_master_nodeid, dir_nodeid);
980  		dlm_print_rsb(r);
981  		write_unlock_bh(&ls->ls_rsbtbl_lock);
982  		error = -ENOTBLK;
983  		goto out;
984  	}
985  
986  	if (!recover && (r->res_master_nodeid != our_nodeid) &&
987  	    (dir_nodeid == our_nodeid)) {
988  		/* our rsb is not master, and we are dir; may as well fix it;
989  		   this should never happen */
990  		log_error(ls, "find_rsb inactive our %d master %d dir %d",
991  			  our_nodeid, r->res_master_nodeid, dir_nodeid);
992  		dlm_print_rsb(r);
993  		r->res_master_nodeid = our_nodeid;
994  		r->res_nodeid = 0;
995  	}
996  
997  	del_scan(ls, r);
998  	list_move(&r->res_slow_list, &ls->ls_slow_active);
999  	rsb_clear_flag(r, RSB_INACTIVE);
1000  	kref_init(&r->res_ref);
1001  	write_unlock_bh(&ls->ls_rsbtbl_lock);
1002  
1003  	goto out;
1004  
1005  
1006   do_new:
1007  	/*
1008  	 * rsb not found
1009  	 */
1010  
1011  	error = get_rsb_struct(ls, name, len, &r);
1012  	if (WARN_ON_ONCE(error))
1013  		goto out;
1014  
1015  	r->res_hash = hash;
1016  	r->res_dir_nodeid = dir_nodeid;
1017  	r->res_master_nodeid = dir_nodeid;
1018  	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1019  	kref_init(&r->res_ref);
1020  
1021  	write_lock_bh(&ls->ls_rsbtbl_lock);
1022  	error = rsb_insert(r, &ls->ls_rsbtbl);
1023  	if (error == -EEXIST) {
1024  		/* somebody else was faster and it seems the
1025  		 * rsb exists now, we do a whole relookup
1026  		 */
1027  		write_unlock_bh(&ls->ls_rsbtbl_lock);
1028  		dlm_free_rsb(r);
1029  		goto retry;
1030  	} else if (!error) {
1031  		list_add(&r->res_slow_list, &ls->ls_slow_active);
1032  	}
1033  	write_unlock_bh(&ls->ls_rsbtbl_lock);
1034  
1035   out:
1036  	*r_ret = r;
1037  	return error;
1038  }
1039  
1040  /*
1041   * rsb rcu usage
1042   *
1043   * While rcu read lock is held, the rsb cannot be freed,
1044   * which allows a lookup optimization.
1045   *
1046   * Two threads are accessing the same rsb concurrently,
1047   * the first (A) is trying to use the rsb, the second (B)
1048   * is trying to free the rsb.
1049   *
1050   * thread A                 thread B
1051   * (trying to use rsb)      (trying to free rsb)
1052   *
1053   * A1. rcu read lock
1054   * A2. rsbtbl read lock
1055   * A3. look up rsb in rsbtbl
1056   * A4. rsbtbl read unlock
1057   *                          B1. rsbtbl write lock
1058   *                          B2. look up rsb in rsbtbl
1059   *                          B3. remove rsb from rsbtbl
1060   *                          B4. clear rsb HASHED flag
1061   *                          B5. rsbtbl write unlock
1062   *                          B6. begin freeing rsb using rcu...
1063   *
1064   * (rsb is inactive, so try to make it active again)
1065   * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1066   * A6. the rsb HASHED flag is not set, which it means the rsb
1067   *     is being removed from rsbtbl and freed, so don't use it.
1068   * A7. rcu read unlock
1069   *
1070   *                          B7. ...finish freeing rsb using rcu
1071   * A8. create a new rsb
1072   *
1073   * Without the rcu optimization, steps A5-8 would need to do
1074   * an extra rsbtbl lookup:
1075   * A5. rsbtbl write lock
1076   * A6. look up rsb in rsbtbl, not found
1077   * A7. rsbtbl write unlock
1078   * A8. create a new rsb
1079   */
1080  
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1081  static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1082  		    int from_nodeid, unsigned int flags,
1083  		    struct dlm_rsb **r_ret)
1084  {
1085  	int dir_nodeid;
1086  	uint32_t hash;
1087  	int rv;
1088  
1089  	if (len > DLM_RESNAME_MAXLEN)
1090  		return -EINVAL;
1091  
1092  	hash = jhash(name, len, 0);
1093  	dir_nodeid = dlm_hash2nodeid(ls, hash);
1094  
1095  	rcu_read_lock();
1096  	if (dlm_no_directory(ls))
1097  		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1098  				      from_nodeid, flags, r_ret);
1099  	else
1100  		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1101  				    from_nodeid, flags, r_ret);
1102  	rcu_read_unlock();
1103  	return rv;
1104  }
1105  
1106  /* we have received a request and found that res_master_nodeid != our_nodeid,
1107     so we need to return an error or make ourself the master */
1108  
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1109  static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1110  				  int from_nodeid)
1111  {
1112  	if (dlm_no_directory(ls)) {
1113  		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1114  			  from_nodeid, r->res_master_nodeid,
1115  			  r->res_dir_nodeid);
1116  		dlm_print_rsb(r);
1117  		return -ENOTBLK;
1118  	}
1119  
1120  	if (from_nodeid != r->res_dir_nodeid) {
1121  		/* our rsb is not master, and another node (not the dir node)
1122  	   	   has sent us a request.  this is much more common when our
1123  	   	   master_nodeid is zero, so limit debug to non-zero.  */
1124  
1125  		if (r->res_master_nodeid) {
1126  			log_debug(ls, "validate master from_other %d master %d "
1127  				  "dir %d first %x %s", from_nodeid,
1128  				  r->res_master_nodeid, r->res_dir_nodeid,
1129  				  r->res_first_lkid, r->res_name);
1130  		}
1131  		return -ENOTBLK;
1132  	} else {
1133  		/* our rsb is not master, but the dir nodeid has sent us a
1134  	   	   request; this could happen with master 0 / res_nodeid -1 */
1135  
1136  		if (r->res_master_nodeid) {
1137  			log_error(ls, "validate master from_dir %d master %d "
1138  				  "first %x %s",
1139  				  from_nodeid, r->res_master_nodeid,
1140  				  r->res_first_lkid, r->res_name);
1141  		}
1142  
1143  		r->res_master_nodeid = dlm_our_nodeid();
1144  		r->res_nodeid = 0;
1145  		return 0;
1146  	}
1147  }
1148  
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1149  static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1150  				int from_nodeid, bool is_inactive, unsigned int flags,
1151  				int *r_nodeid, int *result)
1152  {
1153  	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1154  	int from_master = (flags & DLM_LU_RECOVER_DIR);
1155  
1156  	if (r->res_dir_nodeid != our_nodeid) {
1157  		/* should not happen, but may as well fix it and carry on */
1158  		log_error(ls, "%s res_dir %d our %d %s", __func__,
1159  			  r->res_dir_nodeid, our_nodeid, r->res_name);
1160  		r->res_dir_nodeid = our_nodeid;
1161  	}
1162  
1163  	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1164  		/* Recovery uses this function to set a new master when
1165  		 * the previous master failed.  Setting NEW_MASTER will
1166  		 * force dlm_recover_masters to call recover_master on this
1167  		 * rsb even though the res_nodeid is no longer removed.
1168  		 */
1169  
1170  		r->res_master_nodeid = from_nodeid;
1171  		r->res_nodeid = from_nodeid;
1172  		rsb_set_flag(r, RSB_NEW_MASTER);
1173  
1174  		if (is_inactive) {
1175  			/* I don't think we should ever find it inactive. */
1176  			log_error(ls, "%s fix_master inactive", __func__);
1177  			dlm_dump_rsb(r);
1178  		}
1179  	}
1180  
1181  	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1182  		/* this will happen if from_nodeid became master during
1183  		 * a previous recovery cycle, and we aborted the previous
1184  		 * cycle before recovering this master value
1185  		 */
1186  
1187  		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1188  			  __func__, from_nodeid, r->res_master_nodeid,
1189  			  r->res_nodeid, r->res_first_lkid, r->res_name);
1190  
1191  		if (r->res_master_nodeid == our_nodeid) {
1192  			log_error(ls, "from_master %d our_master", from_nodeid);
1193  			dlm_dump_rsb(r);
1194  			goto ret_assign;
1195  		}
1196  
1197  		r->res_master_nodeid = from_nodeid;
1198  		r->res_nodeid = from_nodeid;
1199  		rsb_set_flag(r, RSB_NEW_MASTER);
1200  	}
1201  
1202  	if (!r->res_master_nodeid) {
1203  		/* this will happen if recovery happens while we're looking
1204  		 * up the master for this rsb
1205  		 */
1206  
1207  		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1208  			  from_nodeid, r->res_first_lkid, r->res_name);
1209  		r->res_master_nodeid = from_nodeid;
1210  		r->res_nodeid = from_nodeid;
1211  	}
1212  
1213  	if (!from_master && !fix_master &&
1214  	    (r->res_master_nodeid == from_nodeid)) {
1215  		/* this can happen when the master sends remove, the dir node
1216  		 * finds the rsb on the active list and ignores the remove,
1217  		 * and the former master sends a lookup
1218  		 */
1219  
1220  		log_limit(ls, "%s from master %d flags %x first %x %s",
1221  			  __func__, from_nodeid, flags, r->res_first_lkid,
1222  			  r->res_name);
1223  	}
1224  
1225   ret_assign:
1226  	*r_nodeid = r->res_master_nodeid;
1227  	if (result)
1228  		*result = DLM_LU_MATCH;
1229  }
1230  
1231  /*
1232   * We're the dir node for this res and another node wants to know the
1233   * master nodeid.  During normal operation (non recovery) this is only
1234   * called from receive_lookup(); master lookups when the local node is
1235   * the dir node are done by find_rsb().
1236   *
1237   * normal operation, we are the dir node for a resource
1238   * . _request_lock
1239   * . set_master
1240   * . send_lookup
1241   * . receive_lookup
1242   * . dlm_master_lookup flags 0
1243   *
1244   * recover directory, we are rebuilding dir for all resources
1245   * . dlm_recover_directory
1246   * . dlm_rcom_names
1247   *   remote node sends back the rsb names it is master of and we are dir of
1248   * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1249   *   we either create new rsb setting remote node as master, or find existing
1250   *   rsb and set master to be the remote node.
1251   *
1252   * recover masters, we are finding the new master for resources
1253   * . dlm_recover_masters
1254   * . recover_master
1255   * . dlm_send_rcom_lookup
1256   * . receive_rcom_lookup
1257   * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1258   */
1259  
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1260  static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1261  			      int len, unsigned int flags, int *r_nodeid, int *result)
1262  {
1263  	struct dlm_rsb *r = NULL;
1264  	uint32_t hash;
1265  	int our_nodeid = dlm_our_nodeid();
1266  	int dir_nodeid, error;
1267  
1268  	if (len > DLM_RESNAME_MAXLEN)
1269  		return -EINVAL;
1270  
1271  	if (from_nodeid == our_nodeid) {
1272  		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1273  			  our_nodeid, flags);
1274  		return -EINVAL;
1275  	}
1276  
1277  	hash = jhash(name, len, 0);
1278  	dir_nodeid = dlm_hash2nodeid(ls, hash);
1279  	if (dir_nodeid != our_nodeid) {
1280  		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1281  			  from_nodeid, dir_nodeid, our_nodeid, hash,
1282  			  ls->ls_num_nodes);
1283  		*r_nodeid = -1;
1284  		return -EINVAL;
1285  	}
1286  
1287   retry:
1288  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1289  	if (error)
1290  		goto not_found;
1291  
1292  	/* check if the rsb is active under read lock - likely path */
1293  	read_lock_bh(&ls->ls_rsbtbl_lock);
1294  	if (!rsb_flag(r, RSB_HASHED)) {
1295  		read_unlock_bh(&ls->ls_rsbtbl_lock);
1296  		goto not_found;
1297  	}
1298  
1299  	if (rsb_flag(r, RSB_INACTIVE)) {
1300  		read_unlock_bh(&ls->ls_rsbtbl_lock);
1301  		goto do_inactive;
1302  	}
1303  
1304  	/* because the rsb is active, we need to lock_rsb before
1305  	 * checking/changing re_master_nodeid
1306  	 */
1307  
1308  	hold_rsb(r);
1309  	read_unlock_bh(&ls->ls_rsbtbl_lock);
1310  	lock_rsb(r);
1311  
1312  	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1313  			    flags, r_nodeid, result);
1314  
1315  	/* the rsb was active */
1316  	unlock_rsb(r);
1317  	put_rsb(r);
1318  
1319  	return 0;
1320  
1321   do_inactive:
1322  	/* unlikely path - check if still part of ls_rsbtbl */
1323  	write_lock_bh(&ls->ls_rsbtbl_lock);
1324  
1325  	/* see comment in find_rsb_dir */
1326  	if (rsb_flag(r, RSB_HASHED)) {
1327  		if (!rsb_flag(r, RSB_INACTIVE)) {
1328  			write_unlock_bh(&ls->ls_rsbtbl_lock);
1329  			/* something as changed, very unlikely but
1330  			 * try again
1331  			 */
1332  			goto retry;
1333  		}
1334  	} else {
1335  		write_unlock_bh(&ls->ls_rsbtbl_lock);
1336  		goto not_found;
1337  	}
1338  
1339  	/* because the rsb is inactive, it's not refcounted and lock_rsb
1340  	   is not used, but is protected by the rsbtbl lock */
1341  
1342  	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1343  			    r_nodeid, result);
1344  
1345  	/* A dir record rsb should never be on scan list.
1346  	 * Except when we are the dir and master node.
1347  	 * This function should only be called by the dir
1348  	 * node.
1349  	 */
1350  	WARN_ON(!list_empty(&r->res_scan_list) &&
1351  		r->res_master_nodeid != our_nodeid);
1352  
1353  	write_unlock_bh(&ls->ls_rsbtbl_lock);
1354  
1355  	return 0;
1356  
1357   not_found:
1358  	error = get_rsb_struct(ls, name, len, &r);
1359  	if (WARN_ON_ONCE(error))
1360  		goto out;
1361  
1362  	r->res_hash = hash;
1363  	r->res_dir_nodeid = our_nodeid;
1364  	r->res_master_nodeid = from_nodeid;
1365  	r->res_nodeid = from_nodeid;
1366  	rsb_set_flag(r, RSB_INACTIVE);
1367  
1368  	write_lock_bh(&ls->ls_rsbtbl_lock);
1369  	error = rsb_insert(r, &ls->ls_rsbtbl);
1370  	if (error == -EEXIST) {
1371  		/* somebody else was faster and it seems the
1372  		 * rsb exists now, we do a whole relookup
1373  		 */
1374  		write_unlock_bh(&ls->ls_rsbtbl_lock);
1375  		dlm_free_rsb(r);
1376  		goto retry;
1377  	} else if (error) {
1378  		write_unlock_bh(&ls->ls_rsbtbl_lock);
1379  		/* should never happen */
1380  		dlm_free_rsb(r);
1381  		goto retry;
1382  	}
1383  
1384  	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1385  	write_unlock_bh(&ls->ls_rsbtbl_lock);
1386  
1387  	if (result)
1388  		*result = DLM_LU_ADD;
1389  	*r_nodeid = from_nodeid;
1390   out:
1391  	return error;
1392  }
1393  
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1394  int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1395  		      int len, unsigned int flags, int *r_nodeid, int *result)
1396  {
1397  	int rv;
1398  	rcu_read_lock();
1399  	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1400  	rcu_read_unlock();
1401  	return rv;
1402  }
1403  
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1404  static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1405  {
1406  	struct dlm_rsb *r;
1407  
1408  	read_lock_bh(&ls->ls_rsbtbl_lock);
1409  	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1410  		if (r->res_hash == hash)
1411  			dlm_dump_rsb(r);
1412  	}
1413  	read_unlock_bh(&ls->ls_rsbtbl_lock);
1414  }
1415  
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1416  void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1417  {
1418  	struct dlm_rsb *r = NULL;
1419  	int error;
1420  
1421  	rcu_read_lock();
1422  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1423  	if (!error)
1424  		goto out;
1425  
1426  	dlm_dump_rsb(r);
1427   out:
1428  	rcu_read_unlock();
1429  }
1430  
deactivate_rsb(struct kref * kref)1431  static void deactivate_rsb(struct kref *kref)
1432  {
1433  	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1434  	struct dlm_ls *ls = r->res_ls;
1435  	int our_nodeid = dlm_our_nodeid();
1436  
1437  	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1438  	rsb_set_flag(r, RSB_INACTIVE);
1439  	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1440  
1441  	/*
1442  	 * When the rsb becomes unused, there are two possibilities:
1443  	 * 1. Leave the inactive rsb in place (don't remove it).
1444  	 * 2. Add it to the scan list to be removed.
1445  	 *
1446  	 * 1 is done when the rsb is acting as the dir record
1447  	 * for a remotely mastered rsb.  The rsb must be left
1448  	 * in place as an inactive rsb to act as the dir record.
1449  	 *
1450  	 * 2 is done when a) the rsb is not the master and not the
1451  	 * dir record, b) when the rsb is both the master and the
1452  	 * dir record, c) when the rsb is master but not dir record.
1453  	 *
1454  	 * (If no directory is used, the rsb can always be removed.)
1455  	 */
1456  	if (dlm_no_directory(ls) ||
1457  	    (r->res_master_nodeid == our_nodeid ||
1458  	     dlm_dir_nodeid(r) != our_nodeid))
1459  		add_scan(ls, r);
1460  
1461  	if (r->res_lvbptr) {
1462  		dlm_free_lvb(r->res_lvbptr);
1463  		r->res_lvbptr = NULL;
1464  	}
1465  }
1466  
free_inactive_rsb(struct dlm_rsb * r)1467  void free_inactive_rsb(struct dlm_rsb *r)
1468  {
1469  	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1470  
1471  	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1472  	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1473  	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1474  	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1475  	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1476  	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1477  	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1478  	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1479  
1480  	dlm_free_rsb(r);
1481  }
1482  
1483  /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1484     The rsb must exist as long as any lkb's for it do. */
1485  
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1486  static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1487  {
1488  	hold_rsb(r);
1489  	lkb->lkb_resource = r;
1490  }
1491  
detach_lkb(struct dlm_lkb * lkb)1492  static void detach_lkb(struct dlm_lkb *lkb)
1493  {
1494  	if (lkb->lkb_resource) {
1495  		put_rsb(lkb->lkb_resource);
1496  		lkb->lkb_resource = NULL;
1497  	}
1498  }
1499  
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1500  static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1501  		       unsigned long start, unsigned long end)
1502  {
1503  	struct xa_limit limit;
1504  	struct dlm_lkb *lkb;
1505  	int rv;
1506  
1507  	limit.max = end;
1508  	limit.min = start;
1509  
1510  	lkb = dlm_allocate_lkb();
1511  	if (!lkb)
1512  		return -ENOMEM;
1513  
1514  	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1515  	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1516  	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1517  	lkb->lkb_nodeid = -1;
1518  	lkb->lkb_grmode = DLM_LOCK_IV;
1519  	kref_init(&lkb->lkb_ref);
1520  	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1521  	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1522  
1523  	write_lock_bh(&ls->ls_lkbxa_lock);
1524  	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1525  	write_unlock_bh(&ls->ls_lkbxa_lock);
1526  
1527  	if (rv < 0) {
1528  		log_error(ls, "create_lkb xa error %d", rv);
1529  		dlm_free_lkb(lkb);
1530  		return rv;
1531  	}
1532  
1533  	*lkb_ret = lkb;
1534  	return 0;
1535  }
1536  
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1537  static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1538  {
1539  	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1540  }
1541  
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1542  static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1543  {
1544  	struct dlm_lkb *lkb;
1545  
1546  	rcu_read_lock();
1547  	lkb = xa_load(&ls->ls_lkbxa, lkid);
1548  	if (lkb) {
1549  		/* check if lkb is still part of lkbxa under lkbxa_lock as
1550  		 * the lkb_ref is tight to the lkbxa data structure, see
1551  		 * __put_lkb().
1552  		 */
1553  		read_lock_bh(&ls->ls_lkbxa_lock);
1554  		if (kref_read(&lkb->lkb_ref))
1555  			kref_get(&lkb->lkb_ref);
1556  		else
1557  			lkb = NULL;
1558  		read_unlock_bh(&ls->ls_lkbxa_lock);
1559  	}
1560  	rcu_read_unlock();
1561  
1562  	*lkb_ret = lkb;
1563  	return lkb ? 0 : -ENOENT;
1564  }
1565  
kill_lkb(struct kref * kref)1566  static void kill_lkb(struct kref *kref)
1567  {
1568  	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1569  
1570  	/* All work is done after the return from kref_put() so we
1571  	   can release the write_lock before the detach_lkb */
1572  
1573  	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1574  }
1575  
1576  /* __put_lkb() is used when an lkb may not have an rsb attached to
1577     it so we need to provide the lockspace explicitly */
1578  
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1579  static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1580  {
1581  	uint32_t lkid = lkb->lkb_id;
1582  	int rv;
1583  
1584  	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1585  					&ls->ls_lkbxa_lock);
1586  	if (rv) {
1587  		xa_erase(&ls->ls_lkbxa, lkid);
1588  		write_unlock_bh(&ls->ls_lkbxa_lock);
1589  
1590  		detach_lkb(lkb);
1591  
1592  		/* for local/process lkbs, lvbptr points to caller's lksb */
1593  		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1594  			dlm_free_lvb(lkb->lkb_lvbptr);
1595  		dlm_free_lkb(lkb);
1596  	}
1597  
1598  	return rv;
1599  }
1600  
dlm_put_lkb(struct dlm_lkb * lkb)1601  int dlm_put_lkb(struct dlm_lkb *lkb)
1602  {
1603  	struct dlm_ls *ls;
1604  
1605  	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1606  	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1607  
1608  	ls = lkb->lkb_resource->res_ls;
1609  	return __put_lkb(ls, lkb);
1610  }
1611  
1612  /* This is only called to add a reference when the code already holds
1613     a valid reference to the lkb, so there's no need for locking. */
1614  
hold_lkb(struct dlm_lkb * lkb)1615  static inline void hold_lkb(struct dlm_lkb *lkb)
1616  {
1617  	kref_get(&lkb->lkb_ref);
1618  }
1619  
unhold_lkb_assert(struct kref * kref)1620  static void unhold_lkb_assert(struct kref *kref)
1621  {
1622  	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1623  
1624  	DLM_ASSERT(false, dlm_print_lkb(lkb););
1625  }
1626  
1627  /* This is called when we need to remove a reference and are certain
1628     it's not the last ref.  e.g. del_lkb is always called between a
1629     find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1630     put_lkb would work fine, but would involve unnecessary locking */
1631  
unhold_lkb(struct dlm_lkb * lkb)1632  static inline void unhold_lkb(struct dlm_lkb *lkb)
1633  {
1634  	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1635  }
1636  
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1637  static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1638  			    int mode)
1639  {
1640  	struct dlm_lkb *lkb = NULL, *iter;
1641  
1642  	list_for_each_entry(iter, head, lkb_statequeue)
1643  		if (iter->lkb_rqmode < mode) {
1644  			lkb = iter;
1645  			list_add_tail(new, &iter->lkb_statequeue);
1646  			break;
1647  		}
1648  
1649  	if (!lkb)
1650  		list_add_tail(new, head);
1651  }
1652  
1653  /* add/remove lkb to rsb's grant/convert/wait queue */
1654  
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1655  static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1656  {
1657  	kref_get(&lkb->lkb_ref);
1658  
1659  	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1660  
1661  	lkb->lkb_timestamp = ktime_get();
1662  
1663  	lkb->lkb_status = status;
1664  
1665  	switch (status) {
1666  	case DLM_LKSTS_WAITING:
1667  		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1668  			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1669  		else
1670  			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1671  		break;
1672  	case DLM_LKSTS_GRANTED:
1673  		/* convention says granted locks kept in order of grmode */
1674  		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1675  				lkb->lkb_grmode);
1676  		break;
1677  	case DLM_LKSTS_CONVERT:
1678  		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1679  			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1680  		else
1681  			list_add_tail(&lkb->lkb_statequeue,
1682  				      &r->res_convertqueue);
1683  		break;
1684  	default:
1685  		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1686  	}
1687  }
1688  
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1689  static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1690  {
1691  	lkb->lkb_status = 0;
1692  	list_del(&lkb->lkb_statequeue);
1693  	unhold_lkb(lkb);
1694  }
1695  
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1696  static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1697  {
1698  	del_lkb(r, lkb);
1699  	add_lkb(r, lkb, sts);
1700  }
1701  
msg_reply_type(int mstype)1702  static int msg_reply_type(int mstype)
1703  {
1704  	switch (mstype) {
1705  	case DLM_MSG_REQUEST:
1706  		return DLM_MSG_REQUEST_REPLY;
1707  	case DLM_MSG_CONVERT:
1708  		return DLM_MSG_CONVERT_REPLY;
1709  	case DLM_MSG_UNLOCK:
1710  		return DLM_MSG_UNLOCK_REPLY;
1711  	case DLM_MSG_CANCEL:
1712  		return DLM_MSG_CANCEL_REPLY;
1713  	case DLM_MSG_LOOKUP:
1714  		return DLM_MSG_LOOKUP_REPLY;
1715  	}
1716  	return -1;
1717  }
1718  
1719  /* add/remove lkb from global waiters list of lkb's waiting for
1720     a reply from a remote node */
1721  
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1722  static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1723  {
1724  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1725  
1726  	spin_lock_bh(&ls->ls_waiters_lock);
1727  	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1728  		switch (mstype) {
1729  		case DLM_MSG_UNLOCK:
1730  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1731  			break;
1732  		case DLM_MSG_CANCEL:
1733  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1734  			break;
1735  		default:
1736  			/* should never happen as validate_lock_args() checks
1737  			 * on lkb_wait_type and validate_unlock_args() only
1738  			 * creates UNLOCK or CANCEL messages.
1739  			 */
1740  			WARN_ON_ONCE(1);
1741  			goto out;
1742  		}
1743  		lkb->lkb_wait_count++;
1744  		hold_lkb(lkb);
1745  
1746  		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747  			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748  			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749  		goto out;
1750  	}
1751  
1752  	DLM_ASSERT(!lkb->lkb_wait_count,
1753  		   dlm_print_lkb(lkb);
1754  		   printk("wait_count %d\n", lkb->lkb_wait_count););
1755  
1756  	lkb->lkb_wait_count++;
1757  	lkb->lkb_wait_type = mstype;
1758  	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759  	hold_lkb(lkb);
1760  	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761   out:
1762  	spin_unlock_bh(&ls->ls_waiters_lock);
1763  }
1764  
1765  /* We clear the RESEND flag because we might be taking an lkb off the waiters
1766     list as part of process_requestqueue (e.g. a lookup that has an optimized
1767     request reply on the requestqueue) between dlm_recover_waiters_pre() which
1768     set RESEND and dlm_recover_waiters_post() */
1769  
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1770  static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1771  				const struct dlm_message *ms)
1772  {
1773  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1774  	int overlap_done = 0;
1775  
1776  	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1777  	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1778  		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1779  		overlap_done = 1;
1780  		goto out_del;
1781  	}
1782  
1783  	if (mstype == DLM_MSG_CANCEL_REPLY &&
1784  	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1785  		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1786  		overlap_done = 1;
1787  		goto out_del;
1788  	}
1789  
1790  	/* Cancel state was preemptively cleared by a successful convert,
1791  	   see next comment, nothing to do. */
1792  
1793  	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1794  	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1795  		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1796  			  lkb->lkb_id, lkb->lkb_wait_type);
1797  		return -1;
1798  	}
1799  
1800  	/* Remove for the convert reply, and premptively remove for the
1801  	   cancel reply.  A convert has been granted while there's still
1802  	   an outstanding cancel on it (the cancel is moot and the result
1803  	   in the cancel reply should be 0).  We preempt the cancel reply
1804  	   because the app gets the convert result and then can follow up
1805  	   with another op, like convert.  This subsequent op would see the
1806  	   lingering state of the cancel and fail with -EBUSY. */
1807  
1808  	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1809  	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1810  	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1811  		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1812  			  lkb->lkb_id);
1813  		lkb->lkb_wait_type = 0;
1814  		lkb->lkb_wait_count--;
1815  		unhold_lkb(lkb);
1816  		goto out_del;
1817  	}
1818  
1819  	/* N.B. type of reply may not always correspond to type of original
1820  	   msg due to lookup->request optimization, verify others? */
1821  
1822  	if (lkb->lkb_wait_type) {
1823  		lkb->lkb_wait_type = 0;
1824  		goto out_del;
1825  	}
1826  
1827  	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1828  		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1829  		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1830  	return -1;
1831  
1832   out_del:
1833  	/* the force-unlock/cancel has completed and we haven't recvd a reply
1834  	   to the op that was in progress prior to the unlock/cancel; we
1835  	   give up on any reply to the earlier op.  FIXME: not sure when/how
1836  	   this would happen */
1837  
1838  	if (overlap_done && lkb->lkb_wait_type) {
1839  		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1840  			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1841  		lkb->lkb_wait_count--;
1842  		unhold_lkb(lkb);
1843  		lkb->lkb_wait_type = 0;
1844  	}
1845  
1846  	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1847  
1848  	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1849  	lkb->lkb_wait_count--;
1850  	if (!lkb->lkb_wait_count)
1851  		list_del_init(&lkb->lkb_wait_reply);
1852  	unhold_lkb(lkb);
1853  	return 0;
1854  }
1855  
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1856  static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1857  {
1858  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1859  	int error;
1860  
1861  	spin_lock_bh(&ls->ls_waiters_lock);
1862  	error = _remove_from_waiters(lkb, mstype, NULL);
1863  	spin_unlock_bh(&ls->ls_waiters_lock);
1864  	return error;
1865  }
1866  
1867  /* Handles situations where we might be processing a "fake" or "local" reply in
1868   * the recovery context which stops any locking activity. Only debugfs might
1869   * change the lockspace waiters but they will held the recovery lock to ensure
1870   * remove_from_waiters_ms() in local case will be the only user manipulating the
1871   * lockspace waiters in recovery context.
1872   */
1873  
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1874  static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1875  				  const struct dlm_message *ms, bool local)
1876  {
1877  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1878  	int error;
1879  
1880  	if (!local)
1881  		spin_lock_bh(&ls->ls_waiters_lock);
1882  	else
1883  		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1884  			     !dlm_locking_stopped(ls));
1885  	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1886  	if (!local)
1887  		spin_unlock_bh(&ls->ls_waiters_lock);
1888  	return error;
1889  }
1890  
1891  /* lkb is master or local copy */
1892  
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1893  static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1894  {
1895  	int b, len = r->res_ls->ls_lvblen;
1896  
1897  	/* b=1 lvb returned to caller
1898  	   b=0 lvb written to rsb or invalidated
1899  	   b=-1 do nothing */
1900  
1901  	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1902  
1903  	if (b == 1) {
1904  		if (!lkb->lkb_lvbptr)
1905  			return;
1906  
1907  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1908  			return;
1909  
1910  		if (!r->res_lvbptr)
1911  			return;
1912  
1913  		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1914  		lkb->lkb_lvbseq = r->res_lvbseq;
1915  
1916  	} else if (b == 0) {
1917  		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1918  			rsb_set_flag(r, RSB_VALNOTVALID);
1919  			return;
1920  		}
1921  
1922  		if (!lkb->lkb_lvbptr)
1923  			return;
1924  
1925  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1926  			return;
1927  
1928  		if (!r->res_lvbptr)
1929  			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1930  
1931  		if (!r->res_lvbptr)
1932  			return;
1933  
1934  		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1935  		r->res_lvbseq++;
1936  		lkb->lkb_lvbseq = r->res_lvbseq;
1937  		rsb_clear_flag(r, RSB_VALNOTVALID);
1938  	}
1939  
1940  	if (rsb_flag(r, RSB_VALNOTVALID))
1941  		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1942  }
1943  
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1944  static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1945  {
1946  	if (lkb->lkb_grmode < DLM_LOCK_PW)
1947  		return;
1948  
1949  	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1950  		rsb_set_flag(r, RSB_VALNOTVALID);
1951  		return;
1952  	}
1953  
1954  	if (!lkb->lkb_lvbptr)
1955  		return;
1956  
1957  	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1958  		return;
1959  
1960  	if (!r->res_lvbptr)
1961  		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1962  
1963  	if (!r->res_lvbptr)
1964  		return;
1965  
1966  	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1967  	r->res_lvbseq++;
1968  	rsb_clear_flag(r, RSB_VALNOTVALID);
1969  }
1970  
1971  /* lkb is process copy (pc) */
1972  
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1973  static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1974  			    const struct dlm_message *ms)
1975  {
1976  	int b;
1977  
1978  	if (!lkb->lkb_lvbptr)
1979  		return;
1980  
1981  	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1982  		return;
1983  
1984  	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1985  	if (b == 1) {
1986  		int len = receive_extralen(ms);
1987  		if (len > r->res_ls->ls_lvblen)
1988  			len = r->res_ls->ls_lvblen;
1989  		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1990  		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1991  	}
1992  }
1993  
1994  /* Manipulate lkb's on rsb's convert/granted/waiting queues
1995     remove_lock -- used for unlock, removes lkb from granted
1996     revert_lock -- used for cancel, moves lkb from convert to granted
1997     grant_lock  -- used for request and convert, adds lkb to granted or
1998                    moves lkb from convert or waiting to granted
1999  
2000     Each of these is used for master or local copy lkb's.  There is
2001     also a _pc() variation used to make the corresponding change on
2002     a process copy (pc) lkb. */
2003  
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2004  static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005  {
2006  	del_lkb(r, lkb);
2007  	lkb->lkb_grmode = DLM_LOCK_IV;
2008  	/* this unhold undoes the original ref from create_lkb()
2009  	   so this leads to the lkb being freed */
2010  	unhold_lkb(lkb);
2011  }
2012  
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2013  static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2014  {
2015  	set_lvb_unlock(r, lkb);
2016  	_remove_lock(r, lkb);
2017  }
2018  
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2019  static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020  {
2021  	_remove_lock(r, lkb);
2022  }
2023  
2024  /* returns: 0 did nothing
2025  	    1 moved lock to granted
2026  	   -1 removed lock */
2027  
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2028  static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2029  {
2030  	int rv = 0;
2031  
2032  	lkb->lkb_rqmode = DLM_LOCK_IV;
2033  
2034  	switch (lkb->lkb_status) {
2035  	case DLM_LKSTS_GRANTED:
2036  		break;
2037  	case DLM_LKSTS_CONVERT:
2038  		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2039  		rv = 1;
2040  		break;
2041  	case DLM_LKSTS_WAITING:
2042  		del_lkb(r, lkb);
2043  		lkb->lkb_grmode = DLM_LOCK_IV;
2044  		/* this unhold undoes the original ref from create_lkb()
2045  		   so this leads to the lkb being freed */
2046  		unhold_lkb(lkb);
2047  		rv = -1;
2048  		break;
2049  	default:
2050  		log_print("invalid status for revert %d", lkb->lkb_status);
2051  	}
2052  	return rv;
2053  }
2054  
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2055  static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056  {
2057  	return revert_lock(r, lkb);
2058  }
2059  
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2060  static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061  {
2062  	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2063  		lkb->lkb_grmode = lkb->lkb_rqmode;
2064  		if (lkb->lkb_status)
2065  			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2066  		else
2067  			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2068  	}
2069  
2070  	lkb->lkb_rqmode = DLM_LOCK_IV;
2071  	lkb->lkb_highbast = 0;
2072  }
2073  
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2074  static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075  {
2076  	set_lvb_lock(r, lkb);
2077  	_grant_lock(r, lkb);
2078  }
2079  
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2080  static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2081  			  const struct dlm_message *ms)
2082  {
2083  	set_lvb_lock_pc(r, lkb, ms);
2084  	_grant_lock(r, lkb);
2085  }
2086  
2087  /* called by grant_pending_locks() which means an async grant message must
2088     be sent to the requesting node in addition to granting the lock if the
2089     lkb belongs to a remote node. */
2090  
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2091  static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2092  {
2093  	grant_lock(r, lkb);
2094  	if (is_master_copy(lkb))
2095  		send_grant(r, lkb);
2096  	else
2097  		queue_cast(r, lkb, 0);
2098  }
2099  
2100  /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2101     change the granted/requested modes.  We're munging things accordingly in
2102     the process copy.
2103     CONVDEADLK: our grmode may have been forced down to NL to resolve a
2104     conversion deadlock
2105     ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2106     compatible with other granted locks */
2107  
munge_demoted(struct dlm_lkb * lkb)2108  static void munge_demoted(struct dlm_lkb *lkb)
2109  {
2110  	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2111  		log_print("munge_demoted %x invalid modes gr %d rq %d",
2112  			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2113  		return;
2114  	}
2115  
2116  	lkb->lkb_grmode = DLM_LOCK_NL;
2117  }
2118  
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2119  static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2120  {
2121  	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2122  	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2123  		log_print("munge_altmode %x invalid reply type %d",
2124  			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2125  		return;
2126  	}
2127  
2128  	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2129  		lkb->lkb_rqmode = DLM_LOCK_PR;
2130  	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2131  		lkb->lkb_rqmode = DLM_LOCK_CW;
2132  	else {
2133  		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2134  		dlm_print_lkb(lkb);
2135  	}
2136  }
2137  
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2138  static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2139  {
2140  	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2141  					   lkb_statequeue);
2142  	if (lkb->lkb_id == first->lkb_id)
2143  		return 1;
2144  
2145  	return 0;
2146  }
2147  
2148  /* Check if the given lkb conflicts with another lkb on the queue. */
2149  
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2150  static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2151  {
2152  	struct dlm_lkb *this;
2153  
2154  	list_for_each_entry(this, head, lkb_statequeue) {
2155  		if (this == lkb)
2156  			continue;
2157  		if (!modes_compat(this, lkb))
2158  			return 1;
2159  	}
2160  	return 0;
2161  }
2162  
2163  /*
2164   * "A conversion deadlock arises with a pair of lock requests in the converting
2165   * queue for one resource.  The granted mode of each lock blocks the requested
2166   * mode of the other lock."
2167   *
2168   * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2169   * convert queue from being granted, then deadlk/demote lkb.
2170   *
2171   * Example:
2172   * Granted Queue: empty
2173   * Convert Queue: NL->EX (first lock)
2174   *                PR->EX (second lock)
2175   *
2176   * The first lock can't be granted because of the granted mode of the second
2177   * lock and the second lock can't be granted because it's not first in the
2178   * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2179   * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2180   * flag set and return DEMOTED in the lksb flags.
2181   *
2182   * Originally, this function detected conv-deadlk in a more limited scope:
2183   * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2184   * - if lkb1 was the first entry in the queue (not just earlier), and was
2185   *   blocked by the granted mode of lkb2, and there was nothing on the
2186   *   granted queue preventing lkb1 from being granted immediately, i.e.
2187   *   lkb2 was the only thing preventing lkb1 from being granted.
2188   *
2189   * That second condition meant we'd only say there was conv-deadlk if
2190   * resolving it (by demotion) would lead to the first lock on the convert
2191   * queue being granted right away.  It allowed conversion deadlocks to exist
2192   * between locks on the convert queue while they couldn't be granted anyway.
2193   *
2194   * Now, we detect and take action on conversion deadlocks immediately when
2195   * they're created, even if they may not be immediately consequential.  If
2196   * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2197   * mode that would prevent lkb1's conversion from being granted, we do a
2198   * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2199   * I think this means that the lkb_is_ahead condition below should always
2200   * be zero, i.e. there will never be conv-deadlk between two locks that are
2201   * both already on the convert queue.
2202   */
2203  
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2204  static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2205  {
2206  	struct dlm_lkb *lkb1;
2207  	int lkb_is_ahead = 0;
2208  
2209  	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2210  		if (lkb1 == lkb2) {
2211  			lkb_is_ahead = 1;
2212  			continue;
2213  		}
2214  
2215  		if (!lkb_is_ahead) {
2216  			if (!modes_compat(lkb2, lkb1))
2217  				return 1;
2218  		} else {
2219  			if (!modes_compat(lkb2, lkb1) &&
2220  			    !modes_compat(lkb1, lkb2))
2221  				return 1;
2222  		}
2223  	}
2224  	return 0;
2225  }
2226  
2227  /*
2228   * Return 1 if the lock can be granted, 0 otherwise.
2229   * Also detect and resolve conversion deadlocks.
2230   *
2231   * lkb is the lock to be granted
2232   *
2233   * now is 1 if the function is being called in the context of the
2234   * immediate request, it is 0 if called later, after the lock has been
2235   * queued.
2236   *
2237   * recover is 1 if dlm_recover_grant() is trying to grant conversions
2238   * after recovery.
2239   *
2240   * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2241   */
2242  
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2243  static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2244  			   int recover)
2245  {
2246  	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2247  
2248  	/*
2249  	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2250  	 * a new request for a NL mode lock being blocked.
2251  	 *
2252  	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2253  	 * request, then it would be granted.  In essence, the use of this flag
2254  	 * tells the Lock Manager to expedite theis request by not considering
2255  	 * what may be in the CONVERTING or WAITING queues...  As of this
2256  	 * writing, the EXPEDITE flag can be used only with new requests for NL
2257  	 * mode locks.  This flag is not valid for conversion requests.
2258  	 *
2259  	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2260  	 * conversion or used with a non-NL requested mode.  We also know an
2261  	 * EXPEDITE request is always granted immediately, so now must always
2262  	 * be 1.  The full condition to grant an expedite request: (now &&
2263  	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2264  	 * therefore be shortened to just checking the flag.
2265  	 */
2266  
2267  	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2268  		return 1;
2269  
2270  	/*
2271  	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2272  	 * added to the remaining conditions.
2273  	 */
2274  
2275  	if (queue_conflict(&r->res_grantqueue, lkb))
2276  		return 0;
2277  
2278  	/*
2279  	 * 6-3: By default, a conversion request is immediately granted if the
2280  	 * requested mode is compatible with the modes of all other granted
2281  	 * locks
2282  	 */
2283  
2284  	if (queue_conflict(&r->res_convertqueue, lkb))
2285  		return 0;
2286  
2287  	/*
2288  	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2289  	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2290  	 * The lkb's may have been rebuilt on the queues in a different
2291  	 * order than they were in on the previous master.  So, granting
2292  	 * queued conversions in order after recovery doesn't make sense
2293  	 * since the order hasn't been preserved anyway.  The new order
2294  	 * could also have created a new "in place" conversion deadlock.
2295  	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2296  	 * After recovery, there would be no granted locks, and possibly
2297  	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2298  	 * recovery, grant conversions without considering order.
2299  	 */
2300  
2301  	if (conv && recover)
2302  		return 1;
2303  
2304  	/*
2305  	 * 6-5: But the default algorithm for deciding whether to grant or
2306  	 * queue conversion requests does not by itself guarantee that such
2307  	 * requests are serviced on a "first come first serve" basis.  This, in
2308  	 * turn, can lead to a phenomenon known as "indefinate postponement".
2309  	 *
2310  	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2311  	 * the system service employed to request a lock conversion.  This flag
2312  	 * forces certain conversion requests to be queued, even if they are
2313  	 * compatible with the granted modes of other locks on the same
2314  	 * resource.  Thus, the use of this flag results in conversion requests
2315  	 * being ordered on a "first come first servce" basis.
2316  	 *
2317  	 * DCT: This condition is all about new conversions being able to occur
2318  	 * "in place" while the lock remains on the granted queue (assuming
2319  	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2320  	 * doesn't _have_ to go onto the convert queue where it's processed in
2321  	 * order.  The "now" variable is necessary to distinguish converts
2322  	 * being received and processed for the first time now, because once a
2323  	 * convert is moved to the conversion queue the condition below applies
2324  	 * requiring fifo granting.
2325  	 */
2326  
2327  	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2328  		return 1;
2329  
2330  	/*
2331  	 * Even if the convert is compat with all granted locks,
2332  	 * QUECVT forces it behind other locks on the convert queue.
2333  	 */
2334  
2335  	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2336  		if (list_empty(&r->res_convertqueue))
2337  			return 1;
2338  		else
2339  			return 0;
2340  	}
2341  
2342  	/*
2343  	 * The NOORDER flag is set to avoid the standard vms rules on grant
2344  	 * order.
2345  	 */
2346  
2347  	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2348  		return 1;
2349  
2350  	/*
2351  	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2352  	 * granted until all other conversion requests ahead of it are granted
2353  	 * and/or canceled.
2354  	 */
2355  
2356  	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2357  		return 1;
2358  
2359  	/*
2360  	 * 6-4: By default, a new request is immediately granted only if all
2361  	 * three of the following conditions are satisfied when the request is
2362  	 * issued:
2363  	 * - The queue of ungranted conversion requests for the resource is
2364  	 *   empty.
2365  	 * - The queue of ungranted new requests for the resource is empty.
2366  	 * - The mode of the new request is compatible with the most
2367  	 *   restrictive mode of all granted locks on the resource.
2368  	 */
2369  
2370  	if (now && !conv && list_empty(&r->res_convertqueue) &&
2371  	    list_empty(&r->res_waitqueue))
2372  		return 1;
2373  
2374  	/*
2375  	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2376  	 * it cannot be granted until the queue of ungranted conversion
2377  	 * requests is empty, all ungranted new requests ahead of it are
2378  	 * granted and/or canceled, and it is compatible with the granted mode
2379  	 * of the most restrictive lock granted on the resource.
2380  	 */
2381  
2382  	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2383  	    first_in_list(lkb, &r->res_waitqueue))
2384  		return 1;
2385  
2386  	return 0;
2387  }
2388  
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2389  static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2390  			  int recover, int *err)
2391  {
2392  	int rv;
2393  	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2394  	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2395  
2396  	if (err)
2397  		*err = 0;
2398  
2399  	rv = _can_be_granted(r, lkb, now, recover);
2400  	if (rv)
2401  		goto out;
2402  
2403  	/*
2404  	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2405  	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2406  	 * cancels one of the locks.
2407  	 */
2408  
2409  	if (is_convert && can_be_queued(lkb) &&
2410  	    conversion_deadlock_detect(r, lkb)) {
2411  		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2412  			lkb->lkb_grmode = DLM_LOCK_NL;
2413  			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2414  		} else if (err) {
2415  			*err = -EDEADLK;
2416  		} else {
2417  			log_print("can_be_granted deadlock %x now %d",
2418  				  lkb->lkb_id, now);
2419  			dlm_dump_rsb(r);
2420  		}
2421  		goto out;
2422  	}
2423  
2424  	/*
2425  	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2426  	 * to grant a request in a mode other than the normal rqmode.  It's a
2427  	 * simple way to provide a big optimization to applications that can
2428  	 * use them.
2429  	 */
2430  
2431  	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2432  		alt = DLM_LOCK_PR;
2433  	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2434  		alt = DLM_LOCK_CW;
2435  
2436  	if (alt) {
2437  		lkb->lkb_rqmode = alt;
2438  		rv = _can_be_granted(r, lkb, now, 0);
2439  		if (rv)
2440  			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2441  		else
2442  			lkb->lkb_rqmode = rqmode;
2443  	}
2444   out:
2445  	return rv;
2446  }
2447  
2448  /* Returns the highest requested mode of all blocked conversions; sets
2449     cw if there's a blocked conversion to DLM_LOCK_CW. */
2450  
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2451  static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2452  				 unsigned int *count)
2453  {
2454  	struct dlm_lkb *lkb, *s;
2455  	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2456  	int hi, demoted, quit, grant_restart, demote_restart;
2457  	int deadlk;
2458  
2459  	quit = 0;
2460   restart:
2461  	grant_restart = 0;
2462  	demote_restart = 0;
2463  	hi = DLM_LOCK_IV;
2464  
2465  	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2466  		demoted = is_demoted(lkb);
2467  		deadlk = 0;
2468  
2469  		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2470  			grant_lock_pending(r, lkb);
2471  			grant_restart = 1;
2472  			if (count)
2473  				(*count)++;
2474  			continue;
2475  		}
2476  
2477  		if (!demoted && is_demoted(lkb)) {
2478  			log_print("WARN: pending demoted %x node %d %s",
2479  				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2480  			demote_restart = 1;
2481  			continue;
2482  		}
2483  
2484  		if (deadlk) {
2485  			/*
2486  			 * If DLM_LKB_NODLKWT flag is set and conversion
2487  			 * deadlock is detected, we request blocking AST and
2488  			 * down (or cancel) conversion.
2489  			 */
2490  			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2491  				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2492  					queue_bast(r, lkb, lkb->lkb_rqmode);
2493  					lkb->lkb_highbast = lkb->lkb_rqmode;
2494  				}
2495  			} else {
2496  				log_print("WARN: pending deadlock %x node %d %s",
2497  					  lkb->lkb_id, lkb->lkb_nodeid,
2498  					  r->res_name);
2499  				dlm_dump_rsb(r);
2500  			}
2501  			continue;
2502  		}
2503  
2504  		hi = max_t(int, lkb->lkb_rqmode, hi);
2505  
2506  		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2507  			*cw = 1;
2508  	}
2509  
2510  	if (grant_restart)
2511  		goto restart;
2512  	if (demote_restart && !quit) {
2513  		quit = 1;
2514  		goto restart;
2515  	}
2516  
2517  	return max_t(int, high, hi);
2518  }
2519  
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2520  static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2521  			      unsigned int *count)
2522  {
2523  	struct dlm_lkb *lkb, *s;
2524  
2525  	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2526  		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2527  			grant_lock_pending(r, lkb);
2528  			if (count)
2529  				(*count)++;
2530  		} else {
2531  			high = max_t(int, lkb->lkb_rqmode, high);
2532  			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2533  				*cw = 1;
2534  		}
2535  	}
2536  
2537  	return high;
2538  }
2539  
2540  /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2541     on either the convert or waiting queue.
2542     high is the largest rqmode of all locks blocked on the convert or
2543     waiting queue. */
2544  
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2545  static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2546  {
2547  	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2548  		if (gr->lkb_highbast < DLM_LOCK_EX)
2549  			return 1;
2550  		return 0;
2551  	}
2552  
2553  	if (gr->lkb_highbast < high &&
2554  	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2555  		return 1;
2556  	return 0;
2557  }
2558  
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2559  static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2560  {
2561  	struct dlm_lkb *lkb, *s;
2562  	int high = DLM_LOCK_IV;
2563  	int cw = 0;
2564  
2565  	if (!is_master(r)) {
2566  		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2567  		dlm_dump_rsb(r);
2568  		return;
2569  	}
2570  
2571  	high = grant_pending_convert(r, high, &cw, count);
2572  	high = grant_pending_wait(r, high, &cw, count);
2573  
2574  	if (high == DLM_LOCK_IV)
2575  		return;
2576  
2577  	/*
2578  	 * If there are locks left on the wait/convert queue then send blocking
2579  	 * ASTs to granted locks based on the largest requested mode (high)
2580  	 * found above.
2581  	 */
2582  
2583  	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2584  		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2585  			if (cw && high == DLM_LOCK_PR &&
2586  			    lkb->lkb_grmode == DLM_LOCK_PR)
2587  				queue_bast(r, lkb, DLM_LOCK_CW);
2588  			else
2589  				queue_bast(r, lkb, high);
2590  			lkb->lkb_highbast = high;
2591  		}
2592  	}
2593  }
2594  
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2595  static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2596  {
2597  	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2598  	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2599  		if (gr->lkb_highbast < DLM_LOCK_EX)
2600  			return 1;
2601  		return 0;
2602  	}
2603  
2604  	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2605  		return 1;
2606  	return 0;
2607  }
2608  
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2609  static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2610  			    struct dlm_lkb *lkb)
2611  {
2612  	struct dlm_lkb *gr;
2613  
2614  	list_for_each_entry(gr, head, lkb_statequeue) {
2615  		/* skip self when sending basts to convertqueue */
2616  		if (gr == lkb)
2617  			continue;
2618  		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2619  			queue_bast(r, gr, lkb->lkb_rqmode);
2620  			gr->lkb_highbast = lkb->lkb_rqmode;
2621  		}
2622  	}
2623  }
2624  
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2625  static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2626  {
2627  	send_bast_queue(r, &r->res_grantqueue, lkb);
2628  }
2629  
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2630  static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631  {
2632  	send_bast_queue(r, &r->res_grantqueue, lkb);
2633  	send_bast_queue(r, &r->res_convertqueue, lkb);
2634  }
2635  
2636  /* set_master(r, lkb) -- set the master nodeid of a resource
2637  
2638     The purpose of this function is to set the nodeid field in the given
2639     lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2640     known, it can just be copied to the lkb and the function will return
2641     0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2642     before it can be copied to the lkb.
2643  
2644     When the rsb nodeid is being looked up remotely, the initial lkb
2645     causing the lookup is kept on the ls_waiters list waiting for the
2646     lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2647     on the rsb's res_lookup list until the master is verified.
2648  
2649     Return values:
2650     0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2651     1: the rsb master is not available and the lkb has been placed on
2652        a wait queue
2653  */
2654  
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2655  static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2656  {
2657  	int our_nodeid = dlm_our_nodeid();
2658  
2659  	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2660  		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2661  		r->res_first_lkid = lkb->lkb_id;
2662  		lkb->lkb_nodeid = r->res_nodeid;
2663  		return 0;
2664  	}
2665  
2666  	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2667  		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2668  		return 1;
2669  	}
2670  
2671  	if (r->res_master_nodeid == our_nodeid) {
2672  		lkb->lkb_nodeid = 0;
2673  		return 0;
2674  	}
2675  
2676  	if (r->res_master_nodeid) {
2677  		lkb->lkb_nodeid = r->res_master_nodeid;
2678  		return 0;
2679  	}
2680  
2681  	if (dlm_dir_nodeid(r) == our_nodeid) {
2682  		/* This is a somewhat unusual case; find_rsb will usually
2683  		   have set res_master_nodeid when dir nodeid is local, but
2684  		   there are cases where we become the dir node after we've
2685  		   past find_rsb and go through _request_lock again.
2686  		   confirm_master() or process_lookup_list() needs to be
2687  		   called after this. */
2688  		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2689  			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2690  			  r->res_name);
2691  		r->res_master_nodeid = our_nodeid;
2692  		r->res_nodeid = 0;
2693  		lkb->lkb_nodeid = 0;
2694  		return 0;
2695  	}
2696  
2697  	r->res_first_lkid = lkb->lkb_id;
2698  	send_lookup(r, lkb);
2699  	return 1;
2700  }
2701  
process_lookup_list(struct dlm_rsb * r)2702  static void process_lookup_list(struct dlm_rsb *r)
2703  {
2704  	struct dlm_lkb *lkb, *safe;
2705  
2706  	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2707  		list_del_init(&lkb->lkb_rsb_lookup);
2708  		_request_lock(r, lkb);
2709  	}
2710  }
2711  
2712  /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2713  
confirm_master(struct dlm_rsb * r,int error)2714  static void confirm_master(struct dlm_rsb *r, int error)
2715  {
2716  	struct dlm_lkb *lkb;
2717  
2718  	if (!r->res_first_lkid)
2719  		return;
2720  
2721  	switch (error) {
2722  	case 0:
2723  	case -EINPROGRESS:
2724  		r->res_first_lkid = 0;
2725  		process_lookup_list(r);
2726  		break;
2727  
2728  	case -EAGAIN:
2729  	case -EBADR:
2730  	case -ENOTBLK:
2731  		/* the remote request failed and won't be retried (it was
2732  		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2733  		   lkb the first_lkid */
2734  
2735  		r->res_first_lkid = 0;
2736  
2737  		if (!list_empty(&r->res_lookup)) {
2738  			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2739  					 lkb_rsb_lookup);
2740  			list_del_init(&lkb->lkb_rsb_lookup);
2741  			r->res_first_lkid = lkb->lkb_id;
2742  			_request_lock(r, lkb);
2743  		}
2744  		break;
2745  
2746  	default:
2747  		log_error(r->res_ls, "confirm_master unknown error %d", error);
2748  	}
2749  }
2750  
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2751  static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2752  			 int namelen, void (*ast)(void *astparam),
2753  			 void *astparam,
2754  			 void (*bast)(void *astparam, int mode),
2755  			 struct dlm_args *args)
2756  {
2757  	int rv = -EINVAL;
2758  
2759  	/* check for invalid arg usage */
2760  
2761  	if (mode < 0 || mode > DLM_LOCK_EX)
2762  		goto out;
2763  
2764  	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2765  		goto out;
2766  
2767  	if (flags & DLM_LKF_CANCEL)
2768  		goto out;
2769  
2770  	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2771  		goto out;
2772  
2773  	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2774  		goto out;
2775  
2776  	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2777  		goto out;
2778  
2779  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2780  		goto out;
2781  
2782  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2783  		goto out;
2784  
2785  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2786  		goto out;
2787  
2788  	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2789  		goto out;
2790  
2791  	if (!ast || !lksb)
2792  		goto out;
2793  
2794  	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2795  		goto out;
2796  
2797  	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2798  		goto out;
2799  
2800  	/* these args will be copied to the lkb in validate_lock_args,
2801  	   it cannot be done now because when converting locks, fields in
2802  	   an active lkb cannot be modified before locking the rsb */
2803  
2804  	args->flags = flags;
2805  	args->astfn = ast;
2806  	args->astparam = astparam;
2807  	args->bastfn = bast;
2808  	args->mode = mode;
2809  	args->lksb = lksb;
2810  	rv = 0;
2811   out:
2812  	return rv;
2813  }
2814  
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2815  static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2816  {
2817  	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2818   		      DLM_LKF_FORCEUNLOCK))
2819  		return -EINVAL;
2820  
2821  	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2822  		return -EINVAL;
2823  
2824  	args->flags = flags;
2825  	args->astparam = astarg;
2826  	return 0;
2827  }
2828  
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2829  static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2830  			      struct dlm_args *args)
2831  {
2832  	int rv = -EBUSY;
2833  
2834  	if (args->flags & DLM_LKF_CONVERT) {
2835  		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2836  			goto out;
2837  
2838  		/* lock not allowed if there's any op in progress */
2839  		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2840  			goto out;
2841  
2842  		if (is_overlap(lkb))
2843  			goto out;
2844  
2845  		rv = -EINVAL;
2846  		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2847  			goto out;
2848  
2849  		if (args->flags & DLM_LKF_QUECVT &&
2850  		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2851  			goto out;
2852  	}
2853  
2854  	lkb->lkb_exflags = args->flags;
2855  	dlm_set_sbflags_val(lkb, 0);
2856  	lkb->lkb_astfn = args->astfn;
2857  	lkb->lkb_astparam = args->astparam;
2858  	lkb->lkb_bastfn = args->bastfn;
2859  	lkb->lkb_rqmode = args->mode;
2860  	lkb->lkb_lksb = args->lksb;
2861  	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2862  	lkb->lkb_ownpid = (int) current->pid;
2863  	rv = 0;
2864   out:
2865  	switch (rv) {
2866  	case 0:
2867  		break;
2868  	case -EINVAL:
2869  		/* annoy the user because dlm usage is wrong */
2870  		WARN_ON(1);
2871  		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2872  			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2873  			  lkb->lkb_status, lkb->lkb_wait_type);
2874  		break;
2875  	default:
2876  		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2877  			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878  			  lkb->lkb_status, lkb->lkb_wait_type);
2879  		break;
2880  	}
2881  
2882  	return rv;
2883  }
2884  
2885  /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2886     for success */
2887  
2888  /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2889     because there may be a lookup in progress and it's valid to do
2890     cancel/unlockf on it */
2891  
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2892  static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2893  {
2894  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2895  	int rv = -EBUSY;
2896  
2897  	/* normal unlock not allowed if there's any op in progress */
2898  	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2899  	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2900  		goto out;
2901  
2902  	/* an lkb may be waiting for an rsb lookup to complete where the
2903  	   lookup was initiated by another lock */
2904  
2905  	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2906  		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2907  			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2908  			list_del_init(&lkb->lkb_rsb_lookup);
2909  			queue_cast(lkb->lkb_resource, lkb,
2910  				   args->flags & DLM_LKF_CANCEL ?
2911  				   -DLM_ECANCEL : -DLM_EUNLOCK);
2912  			unhold_lkb(lkb); /* undoes create_lkb() */
2913  		}
2914  		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2915  		goto out;
2916  	}
2917  
2918  	rv = -EINVAL;
2919  	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2920  		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2921  		dlm_print_lkb(lkb);
2922  		goto out;
2923  	}
2924  
2925  	/* an lkb may still exist even though the lock is EOL'ed due to a
2926  	 * cancel, unlock or failed noqueue request; an app can't use these
2927  	 * locks; return same error as if the lkid had not been found at all
2928  	 */
2929  
2930  	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2931  		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2932  		rv = -ENOENT;
2933  		goto out;
2934  	}
2935  
2936  	if (is_overlap_unlock(lkb))
2937  		goto out;
2938  
2939  	/* cancel not allowed with another cancel/unlock in progress */
2940  
2941  	if (args->flags & DLM_LKF_CANCEL) {
2942  		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2943  			goto out;
2944  
2945  		if (is_overlap_cancel(lkb))
2946  			goto out;
2947  
2948  		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2949  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2950  			rv = -EBUSY;
2951  			goto out;
2952  		}
2953  
2954  		/* there's nothing to cancel */
2955  		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2956  		    !lkb->lkb_wait_type) {
2957  			rv = -EBUSY;
2958  			goto out;
2959  		}
2960  
2961  		switch (lkb->lkb_wait_type) {
2962  		case DLM_MSG_LOOKUP:
2963  		case DLM_MSG_REQUEST:
2964  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2965  			rv = -EBUSY;
2966  			goto out;
2967  		case DLM_MSG_UNLOCK:
2968  		case DLM_MSG_CANCEL:
2969  			goto out;
2970  		}
2971  		/* add_to_waiters() will set OVERLAP_CANCEL */
2972  		goto out_ok;
2973  	}
2974  
2975  	/* do we need to allow a force-unlock if there's a normal unlock
2976  	   already in progress?  in what conditions could the normal unlock
2977  	   fail such that we'd want to send a force-unlock to be sure? */
2978  
2979  	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2980  		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2981  			goto out;
2982  
2983  		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2984  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2985  			rv = -EBUSY;
2986  			goto out;
2987  		}
2988  
2989  		switch (lkb->lkb_wait_type) {
2990  		case DLM_MSG_LOOKUP:
2991  		case DLM_MSG_REQUEST:
2992  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2993  			rv = -EBUSY;
2994  			goto out;
2995  		case DLM_MSG_UNLOCK:
2996  			goto out;
2997  		}
2998  		/* add_to_waiters() will set OVERLAP_UNLOCK */
2999  	}
3000  
3001   out_ok:
3002  	/* an overlapping op shouldn't blow away exflags from other op */
3003  	lkb->lkb_exflags |= args->flags;
3004  	dlm_set_sbflags_val(lkb, 0);
3005  	lkb->lkb_astparam = args->astparam;
3006  	rv = 0;
3007   out:
3008  	switch (rv) {
3009  	case 0:
3010  		break;
3011  	case -EINVAL:
3012  		/* annoy the user because dlm usage is wrong */
3013  		WARN_ON(1);
3014  		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3015  			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3016  			  args->flags, lkb->lkb_wait_type,
3017  			  lkb->lkb_resource->res_name);
3018  		break;
3019  	default:
3020  		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3021  			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3022  			  args->flags, lkb->lkb_wait_type,
3023  			  lkb->lkb_resource->res_name);
3024  		break;
3025  	}
3026  
3027  	return rv;
3028  }
3029  
3030  /*
3031   * Four stage 4 varieties:
3032   * do_request(), do_convert(), do_unlock(), do_cancel()
3033   * These are called on the master node for the given lock and
3034   * from the central locking logic.
3035   */
3036  
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3037  static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3038  {
3039  	int error = 0;
3040  
3041  	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3042  		grant_lock(r, lkb);
3043  		queue_cast(r, lkb, 0);
3044  		goto out;
3045  	}
3046  
3047  	if (can_be_queued(lkb)) {
3048  		error = -EINPROGRESS;
3049  		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3050  		goto out;
3051  	}
3052  
3053  	error = -EAGAIN;
3054  	queue_cast(r, lkb, -EAGAIN);
3055   out:
3056  	return error;
3057  }
3058  
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3059  static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3060  			       int error)
3061  {
3062  	switch (error) {
3063  	case -EAGAIN:
3064  		if (force_blocking_asts(lkb))
3065  			send_blocking_asts_all(r, lkb);
3066  		break;
3067  	case -EINPROGRESS:
3068  		send_blocking_asts(r, lkb);
3069  		break;
3070  	}
3071  }
3072  
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3073  static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074  {
3075  	int error = 0;
3076  	int deadlk = 0;
3077  
3078  	/* changing an existing lock may allow others to be granted */
3079  
3080  	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3081  		grant_lock(r, lkb);
3082  		queue_cast(r, lkb, 0);
3083  		goto out;
3084  	}
3085  
3086  	/* can_be_granted() detected that this lock would block in a conversion
3087  	   deadlock, so we leave it on the granted queue and return EDEADLK in
3088  	   the ast for the convert. */
3089  
3090  	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3091  		/* it's left on the granted queue */
3092  		revert_lock(r, lkb);
3093  		queue_cast(r, lkb, -EDEADLK);
3094  		error = -EDEADLK;
3095  		goto out;
3096  	}
3097  
3098  	/* is_demoted() means the can_be_granted() above set the grmode
3099  	   to NL, and left us on the granted queue.  This auto-demotion
3100  	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3101  	   now grantable.  We have to try to grant other converting locks
3102  	   before we try again to grant this one. */
3103  
3104  	if (is_demoted(lkb)) {
3105  		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3106  		if (_can_be_granted(r, lkb, 1, 0)) {
3107  			grant_lock(r, lkb);
3108  			queue_cast(r, lkb, 0);
3109  			goto out;
3110  		}
3111  		/* else fall through and move to convert queue */
3112  	}
3113  
3114  	if (can_be_queued(lkb)) {
3115  		error = -EINPROGRESS;
3116  		del_lkb(r, lkb);
3117  		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3118  		goto out;
3119  	}
3120  
3121  	error = -EAGAIN;
3122  	queue_cast(r, lkb, -EAGAIN);
3123   out:
3124  	return error;
3125  }
3126  
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3127  static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3128  			       int error)
3129  {
3130  	switch (error) {
3131  	case 0:
3132  		grant_pending_locks(r, NULL);
3133  		/* grant_pending_locks also sends basts */
3134  		break;
3135  	case -EAGAIN:
3136  		if (force_blocking_asts(lkb))
3137  			send_blocking_asts_all(r, lkb);
3138  		break;
3139  	case -EINPROGRESS:
3140  		send_blocking_asts(r, lkb);
3141  		break;
3142  	}
3143  }
3144  
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3145  static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3146  {
3147  	remove_lock(r, lkb);
3148  	queue_cast(r, lkb, -DLM_EUNLOCK);
3149  	return -DLM_EUNLOCK;
3150  }
3151  
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3152  static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3153  			      int error)
3154  {
3155  	grant_pending_locks(r, NULL);
3156  }
3157  
3158  /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3159  
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3160  static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3161  {
3162  	int error;
3163  
3164  	error = revert_lock(r, lkb);
3165  	if (error) {
3166  		queue_cast(r, lkb, -DLM_ECANCEL);
3167  		return -DLM_ECANCEL;
3168  	}
3169  	return 0;
3170  }
3171  
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3172  static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3173  			      int error)
3174  {
3175  	if (error)
3176  		grant_pending_locks(r, NULL);
3177  }
3178  
3179  /*
3180   * Four stage 3 varieties:
3181   * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3182   */
3183  
3184  /* add a new lkb to a possibly new rsb, called by requesting process */
3185  
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3186  static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3187  {
3188  	int error;
3189  
3190  	/* set_master: sets lkb nodeid from r */
3191  
3192  	error = set_master(r, lkb);
3193  	if (error < 0)
3194  		goto out;
3195  	if (error) {
3196  		error = 0;
3197  		goto out;
3198  	}
3199  
3200  	if (is_remote(r)) {
3201  		/* receive_request() calls do_request() on remote node */
3202  		error = send_request(r, lkb);
3203  	} else {
3204  		error = do_request(r, lkb);
3205  		/* for remote locks the request_reply is sent
3206  		   between do_request and do_request_effects */
3207  		do_request_effects(r, lkb, error);
3208  	}
3209   out:
3210  	return error;
3211  }
3212  
3213  /* change some property of an existing lkb, e.g. mode */
3214  
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3215  static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3216  {
3217  	int error;
3218  
3219  	if (is_remote(r)) {
3220  		/* receive_convert() calls do_convert() on remote node */
3221  		error = send_convert(r, lkb);
3222  	} else {
3223  		error = do_convert(r, lkb);
3224  		/* for remote locks the convert_reply is sent
3225  		   between do_convert and do_convert_effects */
3226  		do_convert_effects(r, lkb, error);
3227  	}
3228  
3229  	return error;
3230  }
3231  
3232  /* remove an existing lkb from the granted queue */
3233  
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3234  static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235  {
3236  	int error;
3237  
3238  	if (is_remote(r)) {
3239  		/* receive_unlock() calls do_unlock() on remote node */
3240  		error = send_unlock(r, lkb);
3241  	} else {
3242  		error = do_unlock(r, lkb);
3243  		/* for remote locks the unlock_reply is sent
3244  		   between do_unlock and do_unlock_effects */
3245  		do_unlock_effects(r, lkb, error);
3246  	}
3247  
3248  	return error;
3249  }
3250  
3251  /* remove an existing lkb from the convert or wait queue */
3252  
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3253  static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254  {
3255  	int error;
3256  
3257  	if (is_remote(r)) {
3258  		/* receive_cancel() calls do_cancel() on remote node */
3259  		error = send_cancel(r, lkb);
3260  	} else {
3261  		error = do_cancel(r, lkb);
3262  		/* for remote locks the cancel_reply is sent
3263  		   between do_cancel and do_cancel_effects */
3264  		do_cancel_effects(r, lkb, error);
3265  	}
3266  
3267  	return error;
3268  }
3269  
3270  /*
3271   * Four stage 2 varieties:
3272   * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3273   */
3274  
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3275  static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3276  			const void *name, int len,
3277  			struct dlm_args *args)
3278  {
3279  	struct dlm_rsb *r;
3280  	int error;
3281  
3282  	error = validate_lock_args(ls, lkb, args);
3283  	if (error)
3284  		return error;
3285  
3286  	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3287  	if (error)
3288  		return error;
3289  
3290  	lock_rsb(r);
3291  
3292  	attach_lkb(r, lkb);
3293  	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3294  
3295  	error = _request_lock(r, lkb);
3296  
3297  	unlock_rsb(r);
3298  	put_rsb(r);
3299  	return error;
3300  }
3301  
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3302  static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3303  			struct dlm_args *args)
3304  {
3305  	struct dlm_rsb *r;
3306  	int error;
3307  
3308  	r = lkb->lkb_resource;
3309  
3310  	hold_rsb(r);
3311  	lock_rsb(r);
3312  
3313  	error = validate_lock_args(ls, lkb, args);
3314  	if (error)
3315  		goto out;
3316  
3317  	error = _convert_lock(r, lkb);
3318   out:
3319  	unlock_rsb(r);
3320  	put_rsb(r);
3321  	return error;
3322  }
3323  
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3324  static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3325  		       struct dlm_args *args)
3326  {
3327  	struct dlm_rsb *r;
3328  	int error;
3329  
3330  	r = lkb->lkb_resource;
3331  
3332  	hold_rsb(r);
3333  	lock_rsb(r);
3334  
3335  	error = validate_unlock_args(lkb, args);
3336  	if (error)
3337  		goto out;
3338  
3339  	error = _unlock_lock(r, lkb);
3340   out:
3341  	unlock_rsb(r);
3342  	put_rsb(r);
3343  	return error;
3344  }
3345  
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3346  static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3347  		       struct dlm_args *args)
3348  {
3349  	struct dlm_rsb *r;
3350  	int error;
3351  
3352  	r = lkb->lkb_resource;
3353  
3354  	hold_rsb(r);
3355  	lock_rsb(r);
3356  
3357  	error = validate_unlock_args(lkb, args);
3358  	if (error)
3359  		goto out;
3360  
3361  	error = _cancel_lock(r, lkb);
3362   out:
3363  	unlock_rsb(r);
3364  	put_rsb(r);
3365  	return error;
3366  }
3367  
3368  /*
3369   * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3370   */
3371  
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3372  int dlm_lock(dlm_lockspace_t *lockspace,
3373  	     int mode,
3374  	     struct dlm_lksb *lksb,
3375  	     uint32_t flags,
3376  	     const void *name,
3377  	     unsigned int namelen,
3378  	     uint32_t parent_lkid,
3379  	     void (*ast) (void *astarg),
3380  	     void *astarg,
3381  	     void (*bast) (void *astarg, int mode))
3382  {
3383  	struct dlm_ls *ls;
3384  	struct dlm_lkb *lkb;
3385  	struct dlm_args args;
3386  	int error, convert = flags & DLM_LKF_CONVERT;
3387  
3388  	ls = dlm_find_lockspace_local(lockspace);
3389  	if (!ls)
3390  		return -EINVAL;
3391  
3392  	dlm_lock_recovery(ls);
3393  
3394  	if (convert)
3395  		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3396  	else
3397  		error = create_lkb(ls, &lkb);
3398  
3399  	if (error)
3400  		goto out;
3401  
3402  	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3403  
3404  	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3405  			      &args);
3406  	if (error)
3407  		goto out_put;
3408  
3409  	if (convert)
3410  		error = convert_lock(ls, lkb, &args);
3411  	else
3412  		error = request_lock(ls, lkb, name, namelen, &args);
3413  
3414  	if (error == -EINPROGRESS)
3415  		error = 0;
3416   out_put:
3417  	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3418  
3419  	if (convert || error)
3420  		__put_lkb(ls, lkb);
3421  	if (error == -EAGAIN || error == -EDEADLK)
3422  		error = 0;
3423   out:
3424  	dlm_unlock_recovery(ls);
3425  	dlm_put_lockspace(ls);
3426  	return error;
3427  }
3428  
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3429  int dlm_unlock(dlm_lockspace_t *lockspace,
3430  	       uint32_t lkid,
3431  	       uint32_t flags,
3432  	       struct dlm_lksb *lksb,
3433  	       void *astarg)
3434  {
3435  	struct dlm_ls *ls;
3436  	struct dlm_lkb *lkb;
3437  	struct dlm_args args;
3438  	int error;
3439  
3440  	ls = dlm_find_lockspace_local(lockspace);
3441  	if (!ls)
3442  		return -EINVAL;
3443  
3444  	dlm_lock_recovery(ls);
3445  
3446  	error = find_lkb(ls, lkid, &lkb);
3447  	if (error)
3448  		goto out;
3449  
3450  	trace_dlm_unlock_start(ls, lkb, flags);
3451  
3452  	error = set_unlock_args(flags, astarg, &args);
3453  	if (error)
3454  		goto out_put;
3455  
3456  	if (flags & DLM_LKF_CANCEL)
3457  		error = cancel_lock(ls, lkb, &args);
3458  	else
3459  		error = unlock_lock(ls, lkb, &args);
3460  
3461  	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3462  		error = 0;
3463  	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3464  		error = 0;
3465   out_put:
3466  	trace_dlm_unlock_end(ls, lkb, flags, error);
3467  
3468  	dlm_put_lkb(lkb);
3469   out:
3470  	dlm_unlock_recovery(ls);
3471  	dlm_put_lockspace(ls);
3472  	return error;
3473  }
3474  
3475  /*
3476   * send/receive routines for remote operations and replies
3477   *
3478   * send_args
3479   * send_common
3480   * send_request			receive_request
3481   * send_convert			receive_convert
3482   * send_unlock			receive_unlock
3483   * send_cancel			receive_cancel
3484   * send_grant			receive_grant
3485   * send_bast			receive_bast
3486   * send_lookup			receive_lookup
3487   * send_remove			receive_remove
3488   *
3489   * 				send_common_reply
3490   * receive_request_reply	send_request_reply
3491   * receive_convert_reply	send_convert_reply
3492   * receive_unlock_reply		send_unlock_reply
3493   * receive_cancel_reply		send_cancel_reply
3494   * receive_lookup_reply		send_lookup_reply
3495   */
3496  
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3497  static int _create_message(struct dlm_ls *ls, int mb_len,
3498  			   int to_nodeid, int mstype,
3499  			   struct dlm_message **ms_ret,
3500  			   struct dlm_mhandle **mh_ret)
3501  {
3502  	struct dlm_message *ms;
3503  	struct dlm_mhandle *mh;
3504  	char *mb;
3505  
3506  	/* get_buffer gives us a message handle (mh) that we need to
3507  	   pass into midcomms_commit and a message buffer (mb) that we
3508  	   write our data into */
3509  
3510  	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3511  	if (!mh)
3512  		return -ENOBUFS;
3513  
3514  	ms = (struct dlm_message *) mb;
3515  
3516  	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3517  	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3518  	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3519  	ms->m_header.h_length = cpu_to_le16(mb_len);
3520  	ms->m_header.h_cmd = DLM_MSG;
3521  
3522  	ms->m_type = cpu_to_le32(mstype);
3523  
3524  	*mh_ret = mh;
3525  	*ms_ret = ms;
3526  	return 0;
3527  }
3528  
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3529  static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3530  			  int to_nodeid, int mstype,
3531  			  struct dlm_message **ms_ret,
3532  			  struct dlm_mhandle **mh_ret)
3533  {
3534  	int mb_len = sizeof(struct dlm_message);
3535  
3536  	switch (mstype) {
3537  	case DLM_MSG_REQUEST:
3538  	case DLM_MSG_LOOKUP:
3539  	case DLM_MSG_REMOVE:
3540  		mb_len += r->res_length;
3541  		break;
3542  	case DLM_MSG_CONVERT:
3543  	case DLM_MSG_UNLOCK:
3544  	case DLM_MSG_REQUEST_REPLY:
3545  	case DLM_MSG_CONVERT_REPLY:
3546  	case DLM_MSG_GRANT:
3547  		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3548  			mb_len += r->res_ls->ls_lvblen;
3549  		break;
3550  	}
3551  
3552  	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3553  			       ms_ret, mh_ret);
3554  }
3555  
3556  /* further lowcomms enhancements or alternate implementations may make
3557     the return value from this function useful at some point */
3558  
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3559  static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3560  			const void *name, int namelen)
3561  {
3562  	dlm_midcomms_commit_mhandle(mh, name, namelen);
3563  	return 0;
3564  }
3565  
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3566  static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3567  		      struct dlm_message *ms)
3568  {
3569  	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3570  	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3571  	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3572  	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3573  	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3574  	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3575  	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3576  	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3577  	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3578  	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3579  	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3580  	ms->m_hash     = cpu_to_le32(r->res_hash);
3581  
3582  	/* m_result and m_bastmode are set from function args,
3583  	   not from lkb fields */
3584  
3585  	if (lkb->lkb_bastfn)
3586  		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3587  	if (lkb->lkb_astfn)
3588  		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3589  
3590  	/* compare with switch in create_message; send_remove() doesn't
3591  	   use send_args() */
3592  
3593  	switch (ms->m_type) {
3594  	case cpu_to_le32(DLM_MSG_REQUEST):
3595  	case cpu_to_le32(DLM_MSG_LOOKUP):
3596  		memcpy(ms->m_extra, r->res_name, r->res_length);
3597  		break;
3598  	case cpu_to_le32(DLM_MSG_CONVERT):
3599  	case cpu_to_le32(DLM_MSG_UNLOCK):
3600  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3601  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3602  	case cpu_to_le32(DLM_MSG_GRANT):
3603  		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3604  			break;
3605  		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3606  		break;
3607  	}
3608  }
3609  
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3610  static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3611  {
3612  	struct dlm_message *ms;
3613  	struct dlm_mhandle *mh;
3614  	int to_nodeid, error;
3615  
3616  	to_nodeid = r->res_nodeid;
3617  
3618  	add_to_waiters(lkb, mstype, to_nodeid);
3619  	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3620  	if (error)
3621  		goto fail;
3622  
3623  	send_args(r, lkb, ms);
3624  
3625  	error = send_message(mh, ms, r->res_name, r->res_length);
3626  	if (error)
3627  		goto fail;
3628  	return 0;
3629  
3630   fail:
3631  	remove_from_waiters(lkb, msg_reply_type(mstype));
3632  	return error;
3633  }
3634  
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3635  static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3636  {
3637  	return send_common(r, lkb, DLM_MSG_REQUEST);
3638  }
3639  
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3640  static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3641  {
3642  	int error;
3643  
3644  	error = send_common(r, lkb, DLM_MSG_CONVERT);
3645  
3646  	/* down conversions go without a reply from the master */
3647  	if (!error && down_conversion(lkb)) {
3648  		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3649  		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3650  		r->res_ls->ls_local_ms.m_result = 0;
3651  		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3652  	}
3653  
3654  	return error;
3655  }
3656  
3657  /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3658     MASTER_UNCERTAIN to force the next request on the rsb to confirm
3659     that the master is still correct. */
3660  
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3661  static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3662  {
3663  	return send_common(r, lkb, DLM_MSG_UNLOCK);
3664  }
3665  
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3666  static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3667  {
3668  	return send_common(r, lkb, DLM_MSG_CANCEL);
3669  }
3670  
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3671  static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672  {
3673  	struct dlm_message *ms;
3674  	struct dlm_mhandle *mh;
3675  	int to_nodeid, error;
3676  
3677  	to_nodeid = lkb->lkb_nodeid;
3678  
3679  	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3680  	if (error)
3681  		goto out;
3682  
3683  	send_args(r, lkb, ms);
3684  
3685  	ms->m_result = 0;
3686  
3687  	error = send_message(mh, ms, r->res_name, r->res_length);
3688   out:
3689  	return error;
3690  }
3691  
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3692  static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3693  {
3694  	struct dlm_message *ms;
3695  	struct dlm_mhandle *mh;
3696  	int to_nodeid, error;
3697  
3698  	to_nodeid = lkb->lkb_nodeid;
3699  
3700  	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3701  	if (error)
3702  		goto out;
3703  
3704  	send_args(r, lkb, ms);
3705  
3706  	ms->m_bastmode = cpu_to_le32(mode);
3707  
3708  	error = send_message(mh, ms, r->res_name, r->res_length);
3709   out:
3710  	return error;
3711  }
3712  
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3713  static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3714  {
3715  	struct dlm_message *ms;
3716  	struct dlm_mhandle *mh;
3717  	int to_nodeid, error;
3718  
3719  	to_nodeid = dlm_dir_nodeid(r);
3720  
3721  	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3722  	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3723  	if (error)
3724  		goto fail;
3725  
3726  	send_args(r, lkb, ms);
3727  
3728  	error = send_message(mh, ms, r->res_name, r->res_length);
3729  	if (error)
3730  		goto fail;
3731  	return 0;
3732  
3733   fail:
3734  	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3735  	return error;
3736  }
3737  
send_remove(struct dlm_rsb * r)3738  static int send_remove(struct dlm_rsb *r)
3739  {
3740  	struct dlm_message *ms;
3741  	struct dlm_mhandle *mh;
3742  	int to_nodeid, error;
3743  
3744  	to_nodeid = dlm_dir_nodeid(r);
3745  
3746  	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3747  	if (error)
3748  		goto out;
3749  
3750  	memcpy(ms->m_extra, r->res_name, r->res_length);
3751  	ms->m_hash = cpu_to_le32(r->res_hash);
3752  
3753  	error = send_message(mh, ms, r->res_name, r->res_length);
3754   out:
3755  	return error;
3756  }
3757  
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3758  static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3759  			     int mstype, int rv)
3760  {
3761  	struct dlm_message *ms;
3762  	struct dlm_mhandle *mh;
3763  	int to_nodeid, error;
3764  
3765  	to_nodeid = lkb->lkb_nodeid;
3766  
3767  	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3768  	if (error)
3769  		goto out;
3770  
3771  	send_args(r, lkb, ms);
3772  
3773  	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3774  
3775  	error = send_message(mh, ms, r->res_name, r->res_length);
3776   out:
3777  	return error;
3778  }
3779  
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3780  static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3781  {
3782  	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3783  }
3784  
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3785  static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3786  {
3787  	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3788  }
3789  
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3790  static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3791  {
3792  	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3793  }
3794  
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3795  static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3796  {
3797  	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3798  }
3799  
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3800  static int send_lookup_reply(struct dlm_ls *ls,
3801  			     const struct dlm_message *ms_in, int ret_nodeid,
3802  			     int rv)
3803  {
3804  	struct dlm_rsb *r = &ls->ls_local_rsb;
3805  	struct dlm_message *ms;
3806  	struct dlm_mhandle *mh;
3807  	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3808  
3809  	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3810  	if (error)
3811  		goto out;
3812  
3813  	ms->m_lkid = ms_in->m_lkid;
3814  	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3815  	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3816  
3817  	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3818   out:
3819  	return error;
3820  }
3821  
3822  /* which args we save from a received message depends heavily on the type
3823     of message, unlike the send side where we can safely send everything about
3824     the lkb for any type of message */
3825  
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3826  static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3827  {
3828  	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3829  	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3830  	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3831  }
3832  
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3833  static void receive_flags_reply(struct dlm_lkb *lkb,
3834  				const struct dlm_message *ms,
3835  				bool local)
3836  {
3837  	if (local)
3838  		return;
3839  
3840  	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3841  	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3842  }
3843  
receive_extralen(const struct dlm_message * ms)3844  static int receive_extralen(const struct dlm_message *ms)
3845  {
3846  	return (le16_to_cpu(ms->m_header.h_length) -
3847  		sizeof(struct dlm_message));
3848  }
3849  
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3850  static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3851  		       const struct dlm_message *ms)
3852  {
3853  	int len;
3854  
3855  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3856  		if (!lkb->lkb_lvbptr)
3857  			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3858  		if (!lkb->lkb_lvbptr)
3859  			return -ENOMEM;
3860  		len = receive_extralen(ms);
3861  		if (len > ls->ls_lvblen)
3862  			len = ls->ls_lvblen;
3863  		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3864  	}
3865  	return 0;
3866  }
3867  
fake_bastfn(void * astparam,int mode)3868  static void fake_bastfn(void *astparam, int mode)
3869  {
3870  	log_print("fake_bastfn should not be called");
3871  }
3872  
fake_astfn(void * astparam)3873  static void fake_astfn(void *astparam)
3874  {
3875  	log_print("fake_astfn should not be called");
3876  }
3877  
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3878  static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3879  				const struct dlm_message *ms)
3880  {
3881  	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3882  	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3883  	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3884  	lkb->lkb_grmode = DLM_LOCK_IV;
3885  	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3886  
3887  	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3888  	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3889  
3890  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3891  		/* lkb was just created so there won't be an lvb yet */
3892  		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3893  		if (!lkb->lkb_lvbptr)
3894  			return -ENOMEM;
3895  	}
3896  
3897  	return 0;
3898  }
3899  
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3900  static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3901  				const struct dlm_message *ms)
3902  {
3903  	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3904  		return -EBUSY;
3905  
3906  	if (receive_lvb(ls, lkb, ms))
3907  		return -ENOMEM;
3908  
3909  	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3910  	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3911  
3912  	return 0;
3913  }
3914  
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3915  static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3916  			       const struct dlm_message *ms)
3917  {
3918  	if (receive_lvb(ls, lkb, ms))
3919  		return -ENOMEM;
3920  	return 0;
3921  }
3922  
3923  /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3924     uses to send a reply and that the remote end uses to process the reply. */
3925  
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3926  static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3927  {
3928  	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3929  	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3930  	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3931  }
3932  
3933  /* This is called after the rsb is locked so that we can safely inspect
3934     fields in the lkb. */
3935  
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3936  static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3937  {
3938  	int from = le32_to_cpu(ms->m_header.h_nodeid);
3939  	int error = 0;
3940  
3941  	/* currently mixing of user/kernel locks are not supported */
3942  	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3943  	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3944  		log_error(lkb->lkb_resource->res_ls,
3945  			  "got user dlm message for a kernel lock");
3946  		error = -EINVAL;
3947  		goto out;
3948  	}
3949  
3950  	switch (ms->m_type) {
3951  	case cpu_to_le32(DLM_MSG_CONVERT):
3952  	case cpu_to_le32(DLM_MSG_UNLOCK):
3953  	case cpu_to_le32(DLM_MSG_CANCEL):
3954  		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3955  			error = -EINVAL;
3956  		break;
3957  
3958  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3959  	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3960  	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3961  	case cpu_to_le32(DLM_MSG_GRANT):
3962  	case cpu_to_le32(DLM_MSG_BAST):
3963  		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3964  			error = -EINVAL;
3965  		break;
3966  
3967  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3968  		if (!is_process_copy(lkb))
3969  			error = -EINVAL;
3970  		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3971  			error = -EINVAL;
3972  		break;
3973  
3974  	default:
3975  		error = -EINVAL;
3976  	}
3977  
3978  out:
3979  	if (error)
3980  		log_error(lkb->lkb_resource->res_ls,
3981  			  "ignore invalid message %d from %d %x %x %x %d",
3982  			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3983  			  lkb->lkb_remid, dlm_iflags_val(lkb),
3984  			  lkb->lkb_nodeid);
3985  	return error;
3986  }
3987  
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)3988  static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3989  {
3990  	struct dlm_lkb *lkb;
3991  	struct dlm_rsb *r;
3992  	int from_nodeid;
3993  	int error, namelen = 0;
3994  
3995  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3996  
3997  	error = create_lkb(ls, &lkb);
3998  	if (error)
3999  		goto fail;
4000  
4001  	receive_flags(lkb, ms);
4002  	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4003  	error = receive_request_args(ls, lkb, ms);
4004  	if (error) {
4005  		__put_lkb(ls, lkb);
4006  		goto fail;
4007  	}
4008  
4009  	/* The dir node is the authority on whether we are the master
4010  	   for this rsb or not, so if the master sends us a request, we should
4011  	   recreate the rsb if we've destroyed it.   This race happens when we
4012  	   send a remove message to the dir node at the same time that the dir
4013  	   node sends us a request for the rsb. */
4014  
4015  	namelen = receive_extralen(ms);
4016  
4017  	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4018  			 R_RECEIVE_REQUEST, &r);
4019  	if (error) {
4020  		__put_lkb(ls, lkb);
4021  		goto fail;
4022  	}
4023  
4024  	lock_rsb(r);
4025  
4026  	if (r->res_master_nodeid != dlm_our_nodeid()) {
4027  		error = validate_master_nodeid(ls, r, from_nodeid);
4028  		if (error) {
4029  			unlock_rsb(r);
4030  			put_rsb(r);
4031  			__put_lkb(ls, lkb);
4032  			goto fail;
4033  		}
4034  	}
4035  
4036  	attach_lkb(r, lkb);
4037  	error = do_request(r, lkb);
4038  	send_request_reply(r, lkb, error);
4039  	do_request_effects(r, lkb, error);
4040  
4041  	unlock_rsb(r);
4042  	put_rsb(r);
4043  
4044  	if (error == -EINPROGRESS)
4045  		error = 0;
4046  	if (error)
4047  		dlm_put_lkb(lkb);
4048  	return 0;
4049  
4050   fail:
4051  	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4052  	   and do this receive_request again from process_lookup_list once
4053  	   we get the lookup reply.  This would avoid a many repeated
4054  	   ENOTBLK request failures when the lookup reply designating us
4055  	   as master is delayed. */
4056  
4057  	if (error != -ENOTBLK) {
4058  		log_limit(ls, "receive_request %x from %d %d",
4059  			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4060  	}
4061  
4062  	setup_local_lkb(ls, ms);
4063  	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4064  	return error;
4065  }
4066  
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4067  static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4068  {
4069  	struct dlm_lkb *lkb;
4070  	struct dlm_rsb *r;
4071  	int error, reply = 1;
4072  
4073  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4074  	if (error)
4075  		goto fail;
4076  
4077  	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4078  		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4079  			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4080  			  (unsigned long long)lkb->lkb_recover_seq,
4081  			  le32_to_cpu(ms->m_header.h_nodeid),
4082  			  le32_to_cpu(ms->m_lkid));
4083  		error = -ENOENT;
4084  		dlm_put_lkb(lkb);
4085  		goto fail;
4086  	}
4087  
4088  	r = lkb->lkb_resource;
4089  
4090  	hold_rsb(r);
4091  	lock_rsb(r);
4092  
4093  	error = validate_message(lkb, ms);
4094  	if (error)
4095  		goto out;
4096  
4097  	receive_flags(lkb, ms);
4098  
4099  	error = receive_convert_args(ls, lkb, ms);
4100  	if (error) {
4101  		send_convert_reply(r, lkb, error);
4102  		goto out;
4103  	}
4104  
4105  	reply = !down_conversion(lkb);
4106  
4107  	error = do_convert(r, lkb);
4108  	if (reply)
4109  		send_convert_reply(r, lkb, error);
4110  	do_convert_effects(r, lkb, error);
4111   out:
4112  	unlock_rsb(r);
4113  	put_rsb(r);
4114  	dlm_put_lkb(lkb);
4115  	return 0;
4116  
4117   fail:
4118  	setup_local_lkb(ls, ms);
4119  	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4120  	return error;
4121  }
4122  
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4123  static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4124  {
4125  	struct dlm_lkb *lkb;
4126  	struct dlm_rsb *r;
4127  	int error;
4128  
4129  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4130  	if (error)
4131  		goto fail;
4132  
4133  	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4134  		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4135  			  lkb->lkb_id, lkb->lkb_remid,
4136  			  le32_to_cpu(ms->m_header.h_nodeid),
4137  			  le32_to_cpu(ms->m_lkid));
4138  		error = -ENOENT;
4139  		dlm_put_lkb(lkb);
4140  		goto fail;
4141  	}
4142  
4143  	r = lkb->lkb_resource;
4144  
4145  	hold_rsb(r);
4146  	lock_rsb(r);
4147  
4148  	error = validate_message(lkb, ms);
4149  	if (error)
4150  		goto out;
4151  
4152  	receive_flags(lkb, ms);
4153  
4154  	error = receive_unlock_args(ls, lkb, ms);
4155  	if (error) {
4156  		send_unlock_reply(r, lkb, error);
4157  		goto out;
4158  	}
4159  
4160  	error = do_unlock(r, lkb);
4161  	send_unlock_reply(r, lkb, error);
4162  	do_unlock_effects(r, lkb, error);
4163   out:
4164  	unlock_rsb(r);
4165  	put_rsb(r);
4166  	dlm_put_lkb(lkb);
4167  	return 0;
4168  
4169   fail:
4170  	setup_local_lkb(ls, ms);
4171  	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4172  	return error;
4173  }
4174  
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4175  static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4176  {
4177  	struct dlm_lkb *lkb;
4178  	struct dlm_rsb *r;
4179  	int error;
4180  
4181  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4182  	if (error)
4183  		goto fail;
4184  
4185  	receive_flags(lkb, ms);
4186  
4187  	r = lkb->lkb_resource;
4188  
4189  	hold_rsb(r);
4190  	lock_rsb(r);
4191  
4192  	error = validate_message(lkb, ms);
4193  	if (error)
4194  		goto out;
4195  
4196  	error = do_cancel(r, lkb);
4197  	send_cancel_reply(r, lkb, error);
4198  	do_cancel_effects(r, lkb, error);
4199   out:
4200  	unlock_rsb(r);
4201  	put_rsb(r);
4202  	dlm_put_lkb(lkb);
4203  	return 0;
4204  
4205   fail:
4206  	setup_local_lkb(ls, ms);
4207  	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4208  	return error;
4209  }
4210  
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4211  static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4212  {
4213  	struct dlm_lkb *lkb;
4214  	struct dlm_rsb *r;
4215  	int error;
4216  
4217  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4218  	if (error)
4219  		return error;
4220  
4221  	r = lkb->lkb_resource;
4222  
4223  	hold_rsb(r);
4224  	lock_rsb(r);
4225  
4226  	error = validate_message(lkb, ms);
4227  	if (error)
4228  		goto out;
4229  
4230  	receive_flags_reply(lkb, ms, false);
4231  	if (is_altmode(lkb))
4232  		munge_altmode(lkb, ms);
4233  	grant_lock_pc(r, lkb, ms);
4234  	queue_cast(r, lkb, 0);
4235   out:
4236  	unlock_rsb(r);
4237  	put_rsb(r);
4238  	dlm_put_lkb(lkb);
4239  	return 0;
4240  }
4241  
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4242  static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4243  {
4244  	struct dlm_lkb *lkb;
4245  	struct dlm_rsb *r;
4246  	int error;
4247  
4248  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4249  	if (error)
4250  		return error;
4251  
4252  	r = lkb->lkb_resource;
4253  
4254  	hold_rsb(r);
4255  	lock_rsb(r);
4256  
4257  	error = validate_message(lkb, ms);
4258  	if (error)
4259  		goto out;
4260  
4261  	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4262  	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4263   out:
4264  	unlock_rsb(r);
4265  	put_rsb(r);
4266  	dlm_put_lkb(lkb);
4267  	return 0;
4268  }
4269  
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4270  static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4271  {
4272  	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4273  
4274  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4275  	our_nodeid = dlm_our_nodeid();
4276  
4277  	len = receive_extralen(ms);
4278  
4279  	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4280  				  &ret_nodeid, NULL);
4281  
4282  	/* Optimization: we're master so treat lookup as a request */
4283  	if (!error && ret_nodeid == our_nodeid) {
4284  		receive_request(ls, ms);
4285  		return;
4286  	}
4287  	send_lookup_reply(ls, ms, ret_nodeid, error);
4288  }
4289  
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4290  static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4291  {
4292  	char name[DLM_RESNAME_MAXLEN+1];
4293  	struct dlm_rsb *r;
4294  	int rv, len, dir_nodeid, from_nodeid;
4295  
4296  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4297  
4298  	len = receive_extralen(ms);
4299  
4300  	if (len > DLM_RESNAME_MAXLEN) {
4301  		log_error(ls, "receive_remove from %d bad len %d",
4302  			  from_nodeid, len);
4303  		return;
4304  	}
4305  
4306  	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4307  	if (dir_nodeid != dlm_our_nodeid()) {
4308  		log_error(ls, "receive_remove from %d bad nodeid %d",
4309  			  from_nodeid, dir_nodeid);
4310  		return;
4311  	}
4312  
4313  	/*
4314  	 * Look for inactive rsb, if it's there, free it.
4315  	 * If the rsb is active, it's being used, and we should ignore this
4316  	 * message.  This is an expected race between the dir node sending a
4317  	 * request to the master node at the same time as the master node sends
4318  	 * a remove to the dir node.  The resolution to that race is for the
4319  	 * dir node to ignore the remove message, and the master node to
4320  	 * recreate the master rsb when it gets a request from the dir node for
4321  	 * an rsb it doesn't have.
4322  	 */
4323  
4324  	memset(name, 0, sizeof(name));
4325  	memcpy(name, ms->m_extra, len);
4326  
4327  	rcu_read_lock();
4328  	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329  	if (rv) {
4330  		rcu_read_unlock();
4331  		/* should not happen */
4332  		log_error(ls, "%s from %d not found %s", __func__,
4333  			  from_nodeid, name);
4334  		return;
4335  	}
4336  
4337  	write_lock_bh(&ls->ls_rsbtbl_lock);
4338  	if (!rsb_flag(r, RSB_HASHED)) {
4339  		rcu_read_unlock();
4340  		write_unlock_bh(&ls->ls_rsbtbl_lock);
4341  		/* should not happen */
4342  		log_error(ls, "%s from %d got removed during removal %s",
4343  			  __func__, from_nodeid, name);
4344  		return;
4345  	}
4346  	/* at this stage the rsb can only being freed here */
4347  	rcu_read_unlock();
4348  
4349  	if (!rsb_flag(r, RSB_INACTIVE)) {
4350  		if (r->res_master_nodeid != from_nodeid) {
4351  			/* should not happen */
4352  			log_error(ls, "receive_remove on active rsb from %d master %d",
4353  				  from_nodeid, r->res_master_nodeid);
4354  			dlm_print_rsb(r);
4355  			write_unlock_bh(&ls->ls_rsbtbl_lock);
4356  			return;
4357  		}
4358  
4359  		/* Ignore the remove message, see race comment above. */
4360  
4361  		log_debug(ls, "receive_remove from %d master %d first %x %s",
4362  			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4363  			  name);
4364  		write_unlock_bh(&ls->ls_rsbtbl_lock);
4365  		return;
4366  	}
4367  
4368  	if (r->res_master_nodeid != from_nodeid) {
4369  		log_error(ls, "receive_remove inactive from %d master %d",
4370  			  from_nodeid, r->res_master_nodeid);
4371  		dlm_print_rsb(r);
4372  		write_unlock_bh(&ls->ls_rsbtbl_lock);
4373  		return;
4374  	}
4375  
4376  	list_del(&r->res_slow_list);
4377  	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4378  			       dlm_rhash_rsb_params);
4379  	rsb_clear_flag(r, RSB_HASHED);
4380  	write_unlock_bh(&ls->ls_rsbtbl_lock);
4381  
4382  	free_inactive_rsb(r);
4383  }
4384  
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4385  static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4386  {
4387  	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4388  }
4389  
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4390  static int receive_request_reply(struct dlm_ls *ls,
4391  				 const struct dlm_message *ms)
4392  {
4393  	struct dlm_lkb *lkb;
4394  	struct dlm_rsb *r;
4395  	int error, mstype, result;
4396  	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4397  
4398  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4399  	if (error)
4400  		return error;
4401  
4402  	r = lkb->lkb_resource;
4403  	hold_rsb(r);
4404  	lock_rsb(r);
4405  
4406  	error = validate_message(lkb, ms);
4407  	if (error)
4408  		goto out;
4409  
4410  	mstype = lkb->lkb_wait_type;
4411  	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4412  	if (error) {
4413  		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4414  			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4415  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4416  		dlm_dump_rsb(r);
4417  		goto out;
4418  	}
4419  
4420  	/* Optimization: the dir node was also the master, so it took our
4421  	   lookup as a request and sent request reply instead of lookup reply */
4422  	if (mstype == DLM_MSG_LOOKUP) {
4423  		r->res_master_nodeid = from_nodeid;
4424  		r->res_nodeid = from_nodeid;
4425  		lkb->lkb_nodeid = from_nodeid;
4426  	}
4427  
4428  	/* this is the value returned from do_request() on the master */
4429  	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4430  
4431  	switch (result) {
4432  	case -EAGAIN:
4433  		/* request would block (be queued) on remote master */
4434  		queue_cast(r, lkb, -EAGAIN);
4435  		confirm_master(r, -EAGAIN);
4436  		unhold_lkb(lkb); /* undoes create_lkb() */
4437  		break;
4438  
4439  	case -EINPROGRESS:
4440  	case 0:
4441  		/* request was queued or granted on remote master */
4442  		receive_flags_reply(lkb, ms, false);
4443  		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4444  		if (is_altmode(lkb))
4445  			munge_altmode(lkb, ms);
4446  		if (result) {
4447  			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4448  		} else {
4449  			grant_lock_pc(r, lkb, ms);
4450  			queue_cast(r, lkb, 0);
4451  		}
4452  		confirm_master(r, result);
4453  		break;
4454  
4455  	case -EBADR:
4456  	case -ENOTBLK:
4457  		/* find_rsb failed to find rsb or rsb wasn't master */
4458  		log_limit(ls, "receive_request_reply %x from %d %d "
4459  			  "master %d dir %d first %x %s", lkb->lkb_id,
4460  			  from_nodeid, result, r->res_master_nodeid,
4461  			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4462  
4463  		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4464  		    r->res_master_nodeid != dlm_our_nodeid()) {
4465  			/* cause _request_lock->set_master->send_lookup */
4466  			r->res_master_nodeid = 0;
4467  			r->res_nodeid = -1;
4468  			lkb->lkb_nodeid = -1;
4469  		}
4470  
4471  		if (is_overlap(lkb)) {
4472  			/* we'll ignore error in cancel/unlock reply */
4473  			queue_cast_overlap(r, lkb);
4474  			confirm_master(r, result);
4475  			unhold_lkb(lkb); /* undoes create_lkb() */
4476  		} else {
4477  			_request_lock(r, lkb);
4478  
4479  			if (r->res_master_nodeid == dlm_our_nodeid())
4480  				confirm_master(r, 0);
4481  		}
4482  		break;
4483  
4484  	default:
4485  		log_error(ls, "receive_request_reply %x error %d",
4486  			  lkb->lkb_id, result);
4487  	}
4488  
4489  	if ((result == 0 || result == -EINPROGRESS) &&
4490  	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4491  		log_debug(ls, "receive_request_reply %x result %d unlock",
4492  			  lkb->lkb_id, result);
4493  		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4494  		send_unlock(r, lkb);
4495  	} else if ((result == -EINPROGRESS) &&
4496  		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4497  				      &lkb->lkb_iflags)) {
4498  		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4499  		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4500  		send_cancel(r, lkb);
4501  	} else {
4502  		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4503  		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4504  	}
4505   out:
4506  	unlock_rsb(r);
4507  	put_rsb(r);
4508  	dlm_put_lkb(lkb);
4509  	return 0;
4510  }
4511  
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4512  static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4513  				    const struct dlm_message *ms, bool local)
4514  {
4515  	/* this is the value returned from do_convert() on the master */
4516  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4517  	case -EAGAIN:
4518  		/* convert would block (be queued) on remote master */
4519  		queue_cast(r, lkb, -EAGAIN);
4520  		break;
4521  
4522  	case -EDEADLK:
4523  		receive_flags_reply(lkb, ms, local);
4524  		revert_lock_pc(r, lkb);
4525  		queue_cast(r, lkb, -EDEADLK);
4526  		break;
4527  
4528  	case -EINPROGRESS:
4529  		/* convert was queued on remote master */
4530  		receive_flags_reply(lkb, ms, local);
4531  		if (is_demoted(lkb))
4532  			munge_demoted(lkb);
4533  		del_lkb(r, lkb);
4534  		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4535  		break;
4536  
4537  	case 0:
4538  		/* convert was granted on remote master */
4539  		receive_flags_reply(lkb, ms, local);
4540  		if (is_demoted(lkb))
4541  			munge_demoted(lkb);
4542  		grant_lock_pc(r, lkb, ms);
4543  		queue_cast(r, lkb, 0);
4544  		break;
4545  
4546  	default:
4547  		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4548  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4549  			  le32_to_cpu(ms->m_lkid),
4550  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4551  		dlm_print_rsb(r);
4552  		dlm_print_lkb(lkb);
4553  	}
4554  }
4555  
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4556  static void _receive_convert_reply(struct dlm_lkb *lkb,
4557  				   const struct dlm_message *ms, bool local)
4558  {
4559  	struct dlm_rsb *r = lkb->lkb_resource;
4560  	int error;
4561  
4562  	hold_rsb(r);
4563  	lock_rsb(r);
4564  
4565  	error = validate_message(lkb, ms);
4566  	if (error)
4567  		goto out;
4568  
4569  	error = remove_from_waiters_ms(lkb, ms, local);
4570  	if (error)
4571  		goto out;
4572  
4573  	__receive_convert_reply(r, lkb, ms, local);
4574   out:
4575  	unlock_rsb(r);
4576  	put_rsb(r);
4577  }
4578  
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4579  static int receive_convert_reply(struct dlm_ls *ls,
4580  				 const struct dlm_message *ms)
4581  {
4582  	struct dlm_lkb *lkb;
4583  	int error;
4584  
4585  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4586  	if (error)
4587  		return error;
4588  
4589  	_receive_convert_reply(lkb, ms, false);
4590  	dlm_put_lkb(lkb);
4591  	return 0;
4592  }
4593  
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4594  static void _receive_unlock_reply(struct dlm_lkb *lkb,
4595  				  const struct dlm_message *ms, bool local)
4596  {
4597  	struct dlm_rsb *r = lkb->lkb_resource;
4598  	int error;
4599  
4600  	hold_rsb(r);
4601  	lock_rsb(r);
4602  
4603  	error = validate_message(lkb, ms);
4604  	if (error)
4605  		goto out;
4606  
4607  	error = remove_from_waiters_ms(lkb, ms, local);
4608  	if (error)
4609  		goto out;
4610  
4611  	/* this is the value returned from do_unlock() on the master */
4612  
4613  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4614  	case -DLM_EUNLOCK:
4615  		receive_flags_reply(lkb, ms, local);
4616  		remove_lock_pc(r, lkb);
4617  		queue_cast(r, lkb, -DLM_EUNLOCK);
4618  		break;
4619  	case -ENOENT:
4620  		break;
4621  	default:
4622  		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4623  			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4624  	}
4625   out:
4626  	unlock_rsb(r);
4627  	put_rsb(r);
4628  }
4629  
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4630  static int receive_unlock_reply(struct dlm_ls *ls,
4631  				const struct dlm_message *ms)
4632  {
4633  	struct dlm_lkb *lkb;
4634  	int error;
4635  
4636  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4637  	if (error)
4638  		return error;
4639  
4640  	_receive_unlock_reply(lkb, ms, false);
4641  	dlm_put_lkb(lkb);
4642  	return 0;
4643  }
4644  
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4645  static void _receive_cancel_reply(struct dlm_lkb *lkb,
4646  				  const struct dlm_message *ms, bool local)
4647  {
4648  	struct dlm_rsb *r = lkb->lkb_resource;
4649  	int error;
4650  
4651  	hold_rsb(r);
4652  	lock_rsb(r);
4653  
4654  	error = validate_message(lkb, ms);
4655  	if (error)
4656  		goto out;
4657  
4658  	error = remove_from_waiters_ms(lkb, ms, local);
4659  	if (error)
4660  		goto out;
4661  
4662  	/* this is the value returned from do_cancel() on the master */
4663  
4664  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4665  	case -DLM_ECANCEL:
4666  		receive_flags_reply(lkb, ms, local);
4667  		revert_lock_pc(r, lkb);
4668  		queue_cast(r, lkb, -DLM_ECANCEL);
4669  		break;
4670  	case 0:
4671  		break;
4672  	default:
4673  		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4674  			  lkb->lkb_id,
4675  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4676  	}
4677   out:
4678  	unlock_rsb(r);
4679  	put_rsb(r);
4680  }
4681  
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4682  static int receive_cancel_reply(struct dlm_ls *ls,
4683  				const struct dlm_message *ms)
4684  {
4685  	struct dlm_lkb *lkb;
4686  	int error;
4687  
4688  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4689  	if (error)
4690  		return error;
4691  
4692  	_receive_cancel_reply(lkb, ms, false);
4693  	dlm_put_lkb(lkb);
4694  	return 0;
4695  }
4696  
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4697  static void receive_lookup_reply(struct dlm_ls *ls,
4698  				 const struct dlm_message *ms)
4699  {
4700  	struct dlm_lkb *lkb;
4701  	struct dlm_rsb *r;
4702  	int error, ret_nodeid;
4703  	int do_lookup_list = 0;
4704  
4705  	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4706  	if (error) {
4707  		log_error(ls, "%s no lkid %x", __func__,
4708  			  le32_to_cpu(ms->m_lkid));
4709  		return;
4710  	}
4711  
4712  	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4713  	   FIXME: will a non-zero error ever be returned? */
4714  
4715  	r = lkb->lkb_resource;
4716  	hold_rsb(r);
4717  	lock_rsb(r);
4718  
4719  	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4720  	if (error)
4721  		goto out;
4722  
4723  	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4724  
4725  	/* We sometimes receive a request from the dir node for this
4726  	   rsb before we've received the dir node's loookup_reply for it.
4727  	   The request from the dir node implies we're the master, so we set
4728  	   ourself as master in receive_request_reply, and verify here that
4729  	   we are indeed the master. */
4730  
4731  	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4732  		/* This should never happen */
4733  		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4734  			  "master %d dir %d our %d first %x %s",
4735  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4736  			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4737  			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4738  	}
4739  
4740  	if (ret_nodeid == dlm_our_nodeid()) {
4741  		r->res_master_nodeid = ret_nodeid;
4742  		r->res_nodeid = 0;
4743  		do_lookup_list = 1;
4744  		r->res_first_lkid = 0;
4745  	} else if (ret_nodeid == -1) {
4746  		/* the remote node doesn't believe it's the dir node */
4747  		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4748  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4749  		r->res_master_nodeid = 0;
4750  		r->res_nodeid = -1;
4751  		lkb->lkb_nodeid = -1;
4752  	} else {
4753  		/* set_master() will set lkb_nodeid from r */
4754  		r->res_master_nodeid = ret_nodeid;
4755  		r->res_nodeid = ret_nodeid;
4756  	}
4757  
4758  	if (is_overlap(lkb)) {
4759  		log_debug(ls, "receive_lookup_reply %x unlock %x",
4760  			  lkb->lkb_id, dlm_iflags_val(lkb));
4761  		queue_cast_overlap(r, lkb);
4762  		unhold_lkb(lkb); /* undoes create_lkb() */
4763  		goto out_list;
4764  	}
4765  
4766  	_request_lock(r, lkb);
4767  
4768   out_list:
4769  	if (do_lookup_list)
4770  		process_lookup_list(r);
4771   out:
4772  	unlock_rsb(r);
4773  	put_rsb(r);
4774  	dlm_put_lkb(lkb);
4775  }
4776  
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4777  static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4778  			     uint32_t saved_seq)
4779  {
4780  	int error = 0, noent = 0;
4781  
4782  	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4783  		log_limit(ls, "receive %d from non-member %d %x %x %d",
4784  			  le32_to_cpu(ms->m_type),
4785  			  le32_to_cpu(ms->m_header.h_nodeid),
4786  			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4787  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4788  		return;
4789  	}
4790  
4791  	switch (ms->m_type) {
4792  
4793  	/* messages sent to a master node */
4794  
4795  	case cpu_to_le32(DLM_MSG_REQUEST):
4796  		error = receive_request(ls, ms);
4797  		break;
4798  
4799  	case cpu_to_le32(DLM_MSG_CONVERT):
4800  		error = receive_convert(ls, ms);
4801  		break;
4802  
4803  	case cpu_to_le32(DLM_MSG_UNLOCK):
4804  		error = receive_unlock(ls, ms);
4805  		break;
4806  
4807  	case cpu_to_le32(DLM_MSG_CANCEL):
4808  		noent = 1;
4809  		error = receive_cancel(ls, ms);
4810  		break;
4811  
4812  	/* messages sent from a master node (replies to above) */
4813  
4814  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4815  		error = receive_request_reply(ls, ms);
4816  		break;
4817  
4818  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4819  		error = receive_convert_reply(ls, ms);
4820  		break;
4821  
4822  	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4823  		error = receive_unlock_reply(ls, ms);
4824  		break;
4825  
4826  	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4827  		error = receive_cancel_reply(ls, ms);
4828  		break;
4829  
4830  	/* messages sent from a master node (only two types of async msg) */
4831  
4832  	case cpu_to_le32(DLM_MSG_GRANT):
4833  		noent = 1;
4834  		error = receive_grant(ls, ms);
4835  		break;
4836  
4837  	case cpu_to_le32(DLM_MSG_BAST):
4838  		noent = 1;
4839  		error = receive_bast(ls, ms);
4840  		break;
4841  
4842  	/* messages sent to a dir node */
4843  
4844  	case cpu_to_le32(DLM_MSG_LOOKUP):
4845  		receive_lookup(ls, ms);
4846  		break;
4847  
4848  	case cpu_to_le32(DLM_MSG_REMOVE):
4849  		receive_remove(ls, ms);
4850  		break;
4851  
4852  	/* messages sent from a dir node (remove has no reply) */
4853  
4854  	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4855  		receive_lookup_reply(ls, ms);
4856  		break;
4857  
4858  	/* other messages */
4859  
4860  	case cpu_to_le32(DLM_MSG_PURGE):
4861  		receive_purge(ls, ms);
4862  		break;
4863  
4864  	default:
4865  		log_error(ls, "unknown message type %d",
4866  			  le32_to_cpu(ms->m_type));
4867  	}
4868  
4869  	/*
4870  	 * When checking for ENOENT, we're checking the result of
4871  	 * find_lkb(m_remid):
4872  	 *
4873  	 * The lock id referenced in the message wasn't found.  This may
4874  	 * happen in normal usage for the async messages and cancel, so
4875  	 * only use log_debug for them.
4876  	 *
4877  	 * Some errors are expected and normal.
4878  	 */
4879  
4880  	if (error == -ENOENT && noent) {
4881  		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4882  			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4883  			  le32_to_cpu(ms->m_header.h_nodeid),
4884  			  le32_to_cpu(ms->m_lkid), saved_seq);
4885  	} else if (error == -ENOENT) {
4886  		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4887  			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4888  			  le32_to_cpu(ms->m_header.h_nodeid),
4889  			  le32_to_cpu(ms->m_lkid), saved_seq);
4890  
4891  		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4892  			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4893  	}
4894  
4895  	if (error == -EINVAL) {
4896  		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4897  			  "saved_seq %u",
4898  			  le32_to_cpu(ms->m_type),
4899  			  le32_to_cpu(ms->m_header.h_nodeid),
4900  			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4901  			  saved_seq);
4902  	}
4903  }
4904  
4905  /* If the lockspace is in recovery mode (locking stopped), then normal
4906     messages are saved on the requestqueue for processing after recovery is
4907     done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4908     messages off the requestqueue before we process new ones. This occurs right
4909     after recovery completes when we transition from saving all messages on
4910     requestqueue, to processing all the saved messages, to processing new
4911     messages as they arrive. */
4912  
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4913  static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4914  				int nodeid)
4915  {
4916  try_again:
4917  	read_lock_bh(&ls->ls_requestqueue_lock);
4918  	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4919  		/* If we were a member of this lockspace, left, and rejoined,
4920  		   other nodes may still be sending us messages from the
4921  		   lockspace generation before we left. */
4922  		if (WARN_ON_ONCE(!ls->ls_generation)) {
4923  			read_unlock_bh(&ls->ls_requestqueue_lock);
4924  			log_limit(ls, "receive %d from %d ignore old gen",
4925  				  le32_to_cpu(ms->m_type), nodeid);
4926  			return;
4927  		}
4928  
4929  		read_unlock_bh(&ls->ls_requestqueue_lock);
4930  		write_lock_bh(&ls->ls_requestqueue_lock);
4931  		/* recheck because we hold writelock now */
4932  		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4933  			write_unlock_bh(&ls->ls_requestqueue_lock);
4934  			goto try_again;
4935  		}
4936  
4937  		dlm_add_requestqueue(ls, nodeid, ms);
4938  		write_unlock_bh(&ls->ls_requestqueue_lock);
4939  	} else {
4940  		_receive_message(ls, ms, 0);
4941  		read_unlock_bh(&ls->ls_requestqueue_lock);
4942  	}
4943  }
4944  
4945  /* This is called by dlm_recoverd to process messages that were saved on
4946     the requestqueue. */
4947  
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4948  void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4949  			       uint32_t saved_seq)
4950  {
4951  	_receive_message(ls, ms, saved_seq);
4952  }
4953  
4954  /* This is called by the midcomms layer when something is received for
4955     the lockspace.  It could be either a MSG (normal message sent as part of
4956     standard locking activity) or an RCOM (recovery message sent as part of
4957     lockspace recovery). */
4958  
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4959  void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4960  {
4961  	const struct dlm_header *hd = &p->header;
4962  	struct dlm_ls *ls;
4963  	int type = 0;
4964  
4965  	switch (hd->h_cmd) {
4966  	case DLM_MSG:
4967  		type = le32_to_cpu(p->message.m_type);
4968  		break;
4969  	case DLM_RCOM:
4970  		type = le32_to_cpu(p->rcom.rc_type);
4971  		break;
4972  	default:
4973  		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4974  		return;
4975  	}
4976  
4977  	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4978  		log_print("invalid h_nodeid %d from %d lockspace %x",
4979  			  le32_to_cpu(hd->h_nodeid), nodeid,
4980  			  le32_to_cpu(hd->u.h_lockspace));
4981  		return;
4982  	}
4983  
4984  	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4985  	if (!ls) {
4986  		if (dlm_config.ci_log_debug) {
4987  			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4988  				"%u from %d cmd %d type %d\n",
4989  				le32_to_cpu(hd->u.h_lockspace), nodeid,
4990  				hd->h_cmd, type);
4991  		}
4992  
4993  		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4994  			dlm_send_ls_not_ready(nodeid, &p->rcom);
4995  		return;
4996  	}
4997  
4998  	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4999  	   be inactive (in this ls) before transitioning to recovery mode */
5000  
5001  	read_lock_bh(&ls->ls_recv_active);
5002  	if (hd->h_cmd == DLM_MSG)
5003  		dlm_receive_message(ls, &p->message, nodeid);
5004  	else if (hd->h_cmd == DLM_RCOM)
5005  		dlm_receive_rcom(ls, &p->rcom, nodeid);
5006  	else
5007  		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5008  			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5009  	read_unlock_bh(&ls->ls_recv_active);
5010  
5011  	dlm_put_lockspace(ls);
5012  }
5013  
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)5014  static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5015  				   struct dlm_message *ms_local)
5016  {
5017  	if (middle_conversion(lkb)) {
5018  		log_rinfo(ls, "%s %x middle convert in progress", __func__,
5019  			 lkb->lkb_id);
5020  
5021  		/* We sent this lock to the new master. The new master will
5022  		 * tell us when it's granted.  We no longer need a reply, so
5023  		 * use a fake reply to put the lkb into the right state.
5024  		 */
5025  		hold_lkb(lkb);
5026  		memset(ms_local, 0, sizeof(struct dlm_message));
5027  		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5028  		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5029  		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5030  		_receive_convert_reply(lkb, ms_local, true);
5031  		unhold_lkb(lkb);
5032  
5033  	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5034  		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5035  	}
5036  
5037  	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5038  	   conversions are async; there's no reply from the remote master */
5039  }
5040  
5041  /* A waiting lkb needs recovery if the master node has failed, or
5042     the master node is changing (only when no directory is used) */
5043  
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5044  static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5045  				 int dir_nodeid)
5046  {
5047  	if (dlm_no_directory(ls))
5048  		return 1;
5049  
5050  	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5051  		return 1;
5052  
5053  	return 0;
5054  }
5055  
5056  /* Recovery for locks that are waiting for replies from nodes that are now
5057     gone.  We can just complete unlocks and cancels by faking a reply from the
5058     dead node.  Requests and up-conversions we flag to be resent after
5059     recovery.  Down-conversions can just be completed with a fake reply like
5060     unlocks.  Conversions between PR and CW need special attention. */
5061  
dlm_recover_waiters_pre(struct dlm_ls * ls)5062  void dlm_recover_waiters_pre(struct dlm_ls *ls)
5063  {
5064  	struct dlm_lkb *lkb, *safe;
5065  	struct dlm_message *ms_local;
5066  	int wait_type, local_unlock_result, local_cancel_result;
5067  	int dir_nodeid;
5068  
5069  	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5070  	if (!ms_local)
5071  		return;
5072  
5073  	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5074  
5075  		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5076  
5077  		/* exclude debug messages about unlocks because there can be so
5078  		   many and they aren't very interesting */
5079  
5080  		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5081  			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5082  				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5083  				  lkb->lkb_id,
5084  				  lkb->lkb_remid,
5085  				  lkb->lkb_wait_type,
5086  				  lkb->lkb_resource->res_nodeid,
5087  				  lkb->lkb_nodeid,
5088  				  lkb->lkb_wait_nodeid,
5089  				  dir_nodeid);
5090  		}
5091  
5092  		/* all outstanding lookups, regardless of destination  will be
5093  		   resent after recovery is done */
5094  
5095  		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5096  			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5097  			continue;
5098  		}
5099  
5100  		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5101  			continue;
5102  
5103  		wait_type = lkb->lkb_wait_type;
5104  		local_unlock_result = -DLM_EUNLOCK;
5105  		local_cancel_result = -DLM_ECANCEL;
5106  
5107  		/* Main reply may have been received leaving a zero wait_type,
5108  		   but a reply for the overlapping op may not have been
5109  		   received.  In that case we need to fake the appropriate
5110  		   reply for the overlap op. */
5111  
5112  		if (!wait_type) {
5113  			if (is_overlap_cancel(lkb)) {
5114  				wait_type = DLM_MSG_CANCEL;
5115  				if (lkb->lkb_grmode == DLM_LOCK_IV)
5116  					local_cancel_result = 0;
5117  			}
5118  			if (is_overlap_unlock(lkb)) {
5119  				wait_type = DLM_MSG_UNLOCK;
5120  				if (lkb->lkb_grmode == DLM_LOCK_IV)
5121  					local_unlock_result = -ENOENT;
5122  			}
5123  
5124  			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5125  				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5126  				  local_cancel_result, local_unlock_result);
5127  		}
5128  
5129  		switch (wait_type) {
5130  
5131  		case DLM_MSG_REQUEST:
5132  			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5133  			break;
5134  
5135  		case DLM_MSG_CONVERT:
5136  			recover_convert_waiter(ls, lkb, ms_local);
5137  			break;
5138  
5139  		case DLM_MSG_UNLOCK:
5140  			hold_lkb(lkb);
5141  			memset(ms_local, 0, sizeof(struct dlm_message));
5142  			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5143  			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5144  			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5145  			_receive_unlock_reply(lkb, ms_local, true);
5146  			dlm_put_lkb(lkb);
5147  			break;
5148  
5149  		case DLM_MSG_CANCEL:
5150  			hold_lkb(lkb);
5151  			memset(ms_local, 0, sizeof(struct dlm_message));
5152  			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5153  			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5154  			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5155  			_receive_cancel_reply(lkb, ms_local, true);
5156  			dlm_put_lkb(lkb);
5157  			break;
5158  
5159  		default:
5160  			log_error(ls, "invalid lkb wait_type %d %d",
5161  				  lkb->lkb_wait_type, wait_type);
5162  		}
5163  		schedule();
5164  	}
5165  	kfree(ms_local);
5166  }
5167  
find_resend_waiter(struct dlm_ls * ls)5168  static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5169  {
5170  	struct dlm_lkb *lkb = NULL, *iter;
5171  
5172  	spin_lock_bh(&ls->ls_waiters_lock);
5173  	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5174  		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5175  			hold_lkb(iter);
5176  			lkb = iter;
5177  			break;
5178  		}
5179  	}
5180  	spin_unlock_bh(&ls->ls_waiters_lock);
5181  
5182  	return lkb;
5183  }
5184  
5185  /*
5186   * Forced state reset for locks that were in the middle of remote operations
5187   * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5188   * for a reply from a remote operation.)  The lkbs remaining on the waiters
5189   * list need to be reevaluated; some may need resending to a different node
5190   * than previously, and some may now need local handling rather than remote.
5191   *
5192   * First, the lkb state for the voided remote operation is forcibly reset,
5193   * equivalent to what remove_from_waiters() would normally do:
5194   * . lkb removed from ls_waiters list
5195   * . lkb wait_type cleared
5196   * . lkb waiters_count cleared
5197   * . lkb ref count decremented for each waiters_count (almost always 1,
5198   *   but possibly 2 in case of cancel/unlock overlapping, which means
5199   *   two remote replies were being expected for the lkb.)
5200   *
5201   * Second, the lkb is reprocessed like an original operation would be,
5202   * by passing it to _request_lock or _convert_lock, which will either
5203   * process the lkb operation locally, or send it to a remote node again
5204   * and put the lkb back onto the waiters list.
5205   *
5206   * When reprocessing the lkb, we may find that it's flagged for an overlapping
5207   * force-unlock or cancel, either from before recovery began, or after recovery
5208   * finished.  If this is the case, the unlock/cancel is done directly, and the
5209   * original operation is not initiated again (no _request_lock/_convert_lock.)
5210   */
5211  
dlm_recover_waiters_post(struct dlm_ls * ls)5212  int dlm_recover_waiters_post(struct dlm_ls *ls)
5213  {
5214  	struct dlm_lkb *lkb;
5215  	struct dlm_rsb *r;
5216  	int error = 0, mstype, err, oc, ou;
5217  
5218  	while (1) {
5219  		if (dlm_locking_stopped(ls)) {
5220  			log_debug(ls, "recover_waiters_post aborted");
5221  			error = -EINTR;
5222  			break;
5223  		}
5224  
5225  		/*
5226  		 * Find an lkb from the waiters list that's been affected by
5227  		 * recovery node changes, and needs to be reprocessed.  Does
5228  		 * hold_lkb(), adding a refcount.
5229  		 */
5230  		lkb = find_resend_waiter(ls);
5231  		if (!lkb)
5232  			break;
5233  
5234  		r = lkb->lkb_resource;
5235  		hold_rsb(r);
5236  		lock_rsb(r);
5237  
5238  		/*
5239  		 * If the lkb has been flagged for a force unlock or cancel,
5240  		 * then the reprocessing below will be replaced by just doing
5241  		 * the unlock/cancel directly.
5242  		 */
5243  		mstype = lkb->lkb_wait_type;
5244  		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5245  					&lkb->lkb_iflags);
5246  		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5247  					&lkb->lkb_iflags);
5248  		err = 0;
5249  
5250  		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5251  			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5252  			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5253  			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5254  			  dlm_dir_nodeid(r), oc, ou);
5255  
5256  		/*
5257  		 * No reply to the pre-recovery operation will now be received,
5258  		 * so a forced equivalent of remove_from_waiters() is needed to
5259  		 * reset the waiters state that was in place before recovery.
5260  		 */
5261  
5262  		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5263  
5264  		/* Forcibly clear wait_type */
5265  		lkb->lkb_wait_type = 0;
5266  
5267  		/*
5268  		 * Forcibly reset wait_count and associated refcount.  The
5269  		 * wait_count will almost always be 1, but in case of an
5270  		 * overlapping unlock/cancel it could be 2: see where
5271  		 * add_to_waiters() finds the lkb is already on the waiters
5272  		 * list and does lkb_wait_count++; hold_lkb().
5273  		 */
5274  		while (lkb->lkb_wait_count) {
5275  			lkb->lkb_wait_count--;
5276  			unhold_lkb(lkb);
5277  		}
5278  
5279  		/* Forcibly remove from waiters list */
5280  		spin_lock_bh(&ls->ls_waiters_lock);
5281  		list_del_init(&lkb->lkb_wait_reply);
5282  		spin_unlock_bh(&ls->ls_waiters_lock);
5283  
5284  		/*
5285  		 * The lkb is now clear of all prior waiters state and can be
5286  		 * processed locally, or sent to remote node again, or directly
5287  		 * cancelled/unlocked.
5288  		 */
5289  
5290  		if (oc || ou) {
5291  			/* do an unlock or cancel instead of resending */
5292  			switch (mstype) {
5293  			case DLM_MSG_LOOKUP:
5294  			case DLM_MSG_REQUEST:
5295  				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5296  							-DLM_ECANCEL);
5297  				unhold_lkb(lkb); /* undoes create_lkb() */
5298  				break;
5299  			case DLM_MSG_CONVERT:
5300  				if (oc) {
5301  					queue_cast(r, lkb, -DLM_ECANCEL);
5302  				} else {
5303  					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5304  					_unlock_lock(r, lkb);
5305  				}
5306  				break;
5307  			default:
5308  				err = 1;
5309  			}
5310  		} else {
5311  			switch (mstype) {
5312  			case DLM_MSG_LOOKUP:
5313  			case DLM_MSG_REQUEST:
5314  				_request_lock(r, lkb);
5315  				if (r->res_nodeid != -1 && is_master(r))
5316  					confirm_master(r, 0);
5317  				break;
5318  			case DLM_MSG_CONVERT:
5319  				_convert_lock(r, lkb);
5320  				break;
5321  			default:
5322  				err = 1;
5323  			}
5324  		}
5325  
5326  		if (err) {
5327  			log_error(ls, "waiter %x msg %d r_nodeid %d "
5328  				  "dir_nodeid %d overlap %d %d",
5329  				  lkb->lkb_id, mstype, r->res_nodeid,
5330  				  dlm_dir_nodeid(r), oc, ou);
5331  		}
5332  		unlock_rsb(r);
5333  		put_rsb(r);
5334  		dlm_put_lkb(lkb);
5335  	}
5336  
5337  	return error;
5338  }
5339  
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5340  static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5341  			      struct list_head *list)
5342  {
5343  	struct dlm_lkb *lkb, *safe;
5344  
5345  	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5346  		if (!is_master_copy(lkb))
5347  			continue;
5348  
5349  		/* don't purge lkbs we've added in recover_master_copy for
5350  		   the current recovery seq */
5351  
5352  		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5353  			continue;
5354  
5355  		del_lkb(r, lkb);
5356  
5357  		/* this put should free the lkb */
5358  		if (!dlm_put_lkb(lkb))
5359  			log_error(ls, "purged mstcpy lkb not released");
5360  	}
5361  }
5362  
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5363  void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5364  {
5365  	struct dlm_ls *ls = r->res_ls;
5366  
5367  	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5368  	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5369  	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5370  }
5371  
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5372  static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5373  			    struct list_head *list,
5374  			    int nodeid_gone, unsigned int *count)
5375  {
5376  	struct dlm_lkb *lkb, *safe;
5377  
5378  	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5379  		if (!is_master_copy(lkb))
5380  			continue;
5381  
5382  		if ((lkb->lkb_nodeid == nodeid_gone) ||
5383  		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5384  
5385  			/* tell recover_lvb to invalidate the lvb
5386  			   because a node holding EX/PW failed */
5387  			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5388  			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5389  				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5390  			}
5391  
5392  			del_lkb(r, lkb);
5393  
5394  			/* this put should free the lkb */
5395  			if (!dlm_put_lkb(lkb))
5396  				log_error(ls, "purged dead lkb not released");
5397  
5398  			rsb_set_flag(r, RSB_RECOVER_GRANT);
5399  
5400  			(*count)++;
5401  		}
5402  	}
5403  }
5404  
5405  /* Get rid of locks held by nodes that are gone. */
5406  
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5407  void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5408  {
5409  	struct dlm_rsb *r;
5410  	struct dlm_member *memb;
5411  	int nodes_count = 0;
5412  	int nodeid_gone = 0;
5413  	unsigned int lkb_count = 0;
5414  
5415  	/* cache one removed nodeid to optimize the common
5416  	   case of a single node removed */
5417  
5418  	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5419  		nodes_count++;
5420  		nodeid_gone = memb->nodeid;
5421  	}
5422  
5423  	if (!nodes_count)
5424  		return;
5425  
5426  	list_for_each_entry(r, root_list, res_root_list) {
5427  		lock_rsb(r);
5428  		if (r->res_nodeid != -1 && is_master(r)) {
5429  			purge_dead_list(ls, r, &r->res_grantqueue,
5430  					nodeid_gone, &lkb_count);
5431  			purge_dead_list(ls, r, &r->res_convertqueue,
5432  					nodeid_gone, &lkb_count);
5433  			purge_dead_list(ls, r, &r->res_waitqueue,
5434  					nodeid_gone, &lkb_count);
5435  		}
5436  		unlock_rsb(r);
5437  
5438  		cond_resched();
5439  	}
5440  
5441  	if (lkb_count)
5442  		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5443  			  lkb_count, nodes_count);
5444  }
5445  
find_grant_rsb(struct dlm_ls * ls)5446  static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5447  {
5448  	struct dlm_rsb *r;
5449  
5450  	read_lock_bh(&ls->ls_rsbtbl_lock);
5451  	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5452  		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5453  			continue;
5454  		if (!is_master(r)) {
5455  			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5456  			continue;
5457  		}
5458  		hold_rsb(r);
5459  		read_unlock_bh(&ls->ls_rsbtbl_lock);
5460  		return r;
5461  	}
5462  	read_unlock_bh(&ls->ls_rsbtbl_lock);
5463  	return NULL;
5464  }
5465  
5466  /*
5467   * Attempt to grant locks on resources that we are the master of.
5468   * Locks may have become grantable during recovery because locks
5469   * from departed nodes have been purged (or not rebuilt), allowing
5470   * previously blocked locks to now be granted.  The subset of rsb's
5471   * we are interested in are those with lkb's on either the convert or
5472   * waiting queues.
5473   *
5474   * Simplest would be to go through each master rsb and check for non-empty
5475   * convert or waiting queues, and attempt to grant on those rsbs.
5476   * Checking the queues requires lock_rsb, though, for which we'd need
5477   * to release the rsbtbl lock.  This would make iterating through all
5478   * rsb's very inefficient.  So, we rely on earlier recovery routines
5479   * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5480   * locks for.
5481   */
5482  
dlm_recover_grant(struct dlm_ls * ls)5483  void dlm_recover_grant(struct dlm_ls *ls)
5484  {
5485  	struct dlm_rsb *r;
5486  	unsigned int count = 0;
5487  	unsigned int rsb_count = 0;
5488  	unsigned int lkb_count = 0;
5489  
5490  	while (1) {
5491  		r = find_grant_rsb(ls);
5492  		if (!r)
5493  			break;
5494  
5495  		rsb_count++;
5496  		count = 0;
5497  		lock_rsb(r);
5498  		/* the RECOVER_GRANT flag is checked in the grant path */
5499  		grant_pending_locks(r, &count);
5500  		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5501  		lkb_count += count;
5502  		confirm_master(r, 0);
5503  		unlock_rsb(r);
5504  		put_rsb(r);
5505  		cond_resched();
5506  	}
5507  
5508  	if (lkb_count)
5509  		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5510  			  lkb_count, rsb_count);
5511  }
5512  
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5513  static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5514  					 uint32_t remid)
5515  {
5516  	struct dlm_lkb *lkb;
5517  
5518  	list_for_each_entry(lkb, head, lkb_statequeue) {
5519  		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5520  			return lkb;
5521  	}
5522  	return NULL;
5523  }
5524  
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5525  static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5526  				    uint32_t remid)
5527  {
5528  	struct dlm_lkb *lkb;
5529  
5530  	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5531  	if (lkb)
5532  		return lkb;
5533  	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5534  	if (lkb)
5535  		return lkb;
5536  	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5537  	if (lkb)
5538  		return lkb;
5539  	return NULL;
5540  }
5541  
5542  /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5543  static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5544  				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5545  {
5546  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5547  
5548  	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5549  	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5550  	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5551  	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5552  	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5553  	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5554  	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5555  	lkb->lkb_rqmode = rl->rl_rqmode;
5556  	lkb->lkb_grmode = rl->rl_grmode;
5557  	/* don't set lkb_status because add_lkb wants to itself */
5558  
5559  	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5560  	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5561  
5562  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5563  		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5564  			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5565  		if (lvblen > ls->ls_lvblen)
5566  			return -EINVAL;
5567  		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5568  		if (!lkb->lkb_lvbptr)
5569  			return -ENOMEM;
5570  		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5571  	}
5572  
5573  	/* Conversions between PR and CW (middle modes) need special handling.
5574  	   The real granted mode of these converting locks cannot be determined
5575  	   until all locks have been rebuilt on the rsb (recover_conversion) */
5576  
5577  	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5578  		/* We may need to adjust grmode depending on other granted locks. */
5579  		log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5580  			  __func__, lkb->lkb_id, lkb->lkb_grmode,
5581  			  lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5582  		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5583  	}
5584  
5585  	return 0;
5586  }
5587  
5588  /* This lkb may have been recovered in a previous aborted recovery so we need
5589     to check if the rsb already has an lkb with the given remote nodeid/lkid.
5590     If so we just send back a standard reply.  If not, we create a new lkb with
5591     the given values and send back our lkid.  We send back our lkid by sending
5592     back the rcom_lock struct we got but with the remid field filled in. */
5593  
5594  /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5595  int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5596  			    __le32 *rl_remid, __le32 *rl_result)
5597  {
5598  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5599  	struct dlm_rsb *r;
5600  	struct dlm_lkb *lkb;
5601  	uint32_t remid = 0;
5602  	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5603  	int error;
5604  
5605  	/* init rl_remid with rcom lock rl_remid */
5606  	*rl_remid = rl->rl_remid;
5607  
5608  	if (rl->rl_parent_lkid) {
5609  		error = -EOPNOTSUPP;
5610  		goto out;
5611  	}
5612  
5613  	remid = le32_to_cpu(rl->rl_lkid);
5614  
5615  	/* In general we expect the rsb returned to be R_MASTER, but we don't
5616  	   have to require it.  Recovery of masters on one node can overlap
5617  	   recovery of locks on another node, so one node can send us MSTCPY
5618  	   locks before we've made ourselves master of this rsb.  We can still
5619  	   add new MSTCPY locks that we receive here without any harm; when
5620  	   we make ourselves master, dlm_recover_masters() won't touch the
5621  	   MSTCPY locks we've received early. */
5622  
5623  	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5624  			 from_nodeid, R_RECEIVE_RECOVER, &r);
5625  	if (error)
5626  		goto out;
5627  
5628  	lock_rsb(r);
5629  
5630  	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5631  		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5632  			  from_nodeid, remid);
5633  		error = -EBADR;
5634  		goto out_unlock;
5635  	}
5636  
5637  	lkb = search_remid(r, from_nodeid, remid);
5638  	if (lkb) {
5639  		error = -EEXIST;
5640  		goto out_remid;
5641  	}
5642  
5643  	error = create_lkb(ls, &lkb);
5644  	if (error)
5645  		goto out_unlock;
5646  
5647  	error = receive_rcom_lock_args(ls, lkb, r, rc);
5648  	if (error) {
5649  		__put_lkb(ls, lkb);
5650  		goto out_unlock;
5651  	}
5652  
5653  	attach_lkb(r, lkb);
5654  	add_lkb(r, lkb, rl->rl_status);
5655  	ls->ls_recover_locks_in++;
5656  
5657  	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5658  		rsb_set_flag(r, RSB_RECOVER_GRANT);
5659  
5660   out_remid:
5661  	/* this is the new value returned to the lock holder for
5662  	   saving in its process-copy lkb */
5663  	*rl_remid = cpu_to_le32(lkb->lkb_id);
5664  
5665  	lkb->lkb_recover_seq = ls->ls_recover_seq;
5666  
5667   out_unlock:
5668  	unlock_rsb(r);
5669  	put_rsb(r);
5670   out:
5671  	if (error && error != -EEXIST)
5672  		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5673  			  from_nodeid, remid, error);
5674  	*rl_result = cpu_to_le32(error);
5675  	return error;
5676  }
5677  
5678  /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5679  int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5680  			     uint64_t seq)
5681  {
5682  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5683  	struct dlm_rsb *r;
5684  	struct dlm_lkb *lkb;
5685  	uint32_t lkid, remid;
5686  	int error, result;
5687  
5688  	lkid = le32_to_cpu(rl->rl_lkid);
5689  	remid = le32_to_cpu(rl->rl_remid);
5690  	result = le32_to_cpu(rl->rl_result);
5691  
5692  	error = find_lkb(ls, lkid, &lkb);
5693  	if (error) {
5694  		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5695  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5696  			  result);
5697  		return error;
5698  	}
5699  
5700  	r = lkb->lkb_resource;
5701  	hold_rsb(r);
5702  	lock_rsb(r);
5703  
5704  	if (!is_process_copy(lkb)) {
5705  		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5706  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707  			  result);
5708  		dlm_dump_rsb(r);
5709  		unlock_rsb(r);
5710  		put_rsb(r);
5711  		dlm_put_lkb(lkb);
5712  		return -EINVAL;
5713  	}
5714  
5715  	switch (result) {
5716  	case -EBADR:
5717  		/* There's a chance the new master received our lock before
5718  		   dlm_recover_master_reply(), this wouldn't happen if we did
5719  		   a barrier between recover_masters and recover_locks. */
5720  
5721  		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5722  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5723  			  result);
5724  
5725  		dlm_send_rcom_lock(r, lkb, seq);
5726  		goto out;
5727  	case -EEXIST:
5728  	case 0:
5729  		lkb->lkb_remid = remid;
5730  		break;
5731  	default:
5732  		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5733  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5734  			  result);
5735  	}
5736  
5737  	/* an ack for dlm_recover_locks() which waits for replies from
5738  	   all the locks it sends to new masters */
5739  	dlm_recovered_lock(r);
5740   out:
5741  	unlock_rsb(r);
5742  	put_rsb(r);
5743  	dlm_put_lkb(lkb);
5744  
5745  	return 0;
5746  }
5747  
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5748  int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5749  		     int mode, uint32_t flags, void *name, unsigned int namelen)
5750  {
5751  	struct dlm_lkb *lkb;
5752  	struct dlm_args args;
5753  	bool do_put = true;
5754  	int error;
5755  
5756  	dlm_lock_recovery(ls);
5757  
5758  	error = create_lkb(ls, &lkb);
5759  	if (error) {
5760  		kfree(ua);
5761  		goto out;
5762  	}
5763  
5764  	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5765  
5766  	if (flags & DLM_LKF_VALBLK) {
5767  		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5768  		if (!ua->lksb.sb_lvbptr) {
5769  			kfree(ua);
5770  			error = -ENOMEM;
5771  			goto out_put;
5772  		}
5773  	}
5774  	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5775  			      fake_bastfn, &args);
5776  	if (error) {
5777  		kfree(ua->lksb.sb_lvbptr);
5778  		ua->lksb.sb_lvbptr = NULL;
5779  		kfree(ua);
5780  		goto out_put;
5781  	}
5782  
5783  	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5784  	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5785  	   lock and that lkb_astparam is the dlm_user_args structure. */
5786  	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5787  	error = request_lock(ls, lkb, name, namelen, &args);
5788  
5789  	switch (error) {
5790  	case 0:
5791  		break;
5792  	case -EINPROGRESS:
5793  		error = 0;
5794  		break;
5795  	case -EAGAIN:
5796  		error = 0;
5797  		fallthrough;
5798  	default:
5799  		goto out_put;
5800  	}
5801  
5802  	/* add this new lkb to the per-process list of locks */
5803  	spin_lock_bh(&ua->proc->locks_spin);
5804  	hold_lkb(lkb);
5805  	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5806  	spin_unlock_bh(&ua->proc->locks_spin);
5807  	do_put = false;
5808   out_put:
5809  	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5810  	if (do_put)
5811  		__put_lkb(ls, lkb);
5812   out:
5813  	dlm_unlock_recovery(ls);
5814  	return error;
5815  }
5816  
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5817  int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5818  		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5819  {
5820  	struct dlm_lkb *lkb;
5821  	struct dlm_args args;
5822  	struct dlm_user_args *ua;
5823  	int error;
5824  
5825  	dlm_lock_recovery(ls);
5826  
5827  	error = find_lkb(ls, lkid, &lkb);
5828  	if (error)
5829  		goto out;
5830  
5831  	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5832  
5833  	/* user can change the params on its lock when it converts it, or
5834  	   add an lvb that didn't exist before */
5835  
5836  	ua = lkb->lkb_ua;
5837  
5838  	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839  		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840  		if (!ua->lksb.sb_lvbptr) {
5841  			error = -ENOMEM;
5842  			goto out_put;
5843  		}
5844  	}
5845  	if (lvb_in && ua->lksb.sb_lvbptr)
5846  		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847  
5848  	ua->xid = ua_tmp->xid;
5849  	ua->castparam = ua_tmp->castparam;
5850  	ua->castaddr = ua_tmp->castaddr;
5851  	ua->bastparam = ua_tmp->bastparam;
5852  	ua->bastaddr = ua_tmp->bastaddr;
5853  	ua->user_lksb = ua_tmp->user_lksb;
5854  
5855  	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5856  			      fake_bastfn, &args);
5857  	if (error)
5858  		goto out_put;
5859  
5860  	error = convert_lock(ls, lkb, &args);
5861  
5862  	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863  		error = 0;
5864   out_put:
5865  	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5866  	dlm_put_lkb(lkb);
5867   out:
5868  	dlm_unlock_recovery(ls);
5869  	kfree(ua_tmp);
5870  	return error;
5871  }
5872  
5873  /*
5874   * The caller asks for an orphan lock on a given resource with a given mode.
5875   * If a matching lock exists, it's moved to the owner's list of locks and
5876   * the lkid is returned.
5877   */
5878  
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5879  int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5880  		     int mode, uint32_t flags, void *name, unsigned int namelen,
5881  		     uint32_t *lkid)
5882  {
5883  	struct dlm_lkb *lkb = NULL, *iter;
5884  	struct dlm_user_args *ua;
5885  	int found_other_mode = 0;
5886  	int rv = 0;
5887  
5888  	spin_lock_bh(&ls->ls_orphans_lock);
5889  	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5890  		if (iter->lkb_resource->res_length != namelen)
5891  			continue;
5892  		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5893  			continue;
5894  		if (iter->lkb_grmode != mode) {
5895  			found_other_mode = 1;
5896  			continue;
5897  		}
5898  
5899  		lkb = iter;
5900  		list_del_init(&iter->lkb_ownqueue);
5901  		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5902  		*lkid = iter->lkb_id;
5903  		break;
5904  	}
5905  	spin_unlock_bh(&ls->ls_orphans_lock);
5906  
5907  	if (!lkb && found_other_mode) {
5908  		rv = -EAGAIN;
5909  		goto out;
5910  	}
5911  
5912  	if (!lkb) {
5913  		rv = -ENOENT;
5914  		goto out;
5915  	}
5916  
5917  	lkb->lkb_exflags = flags;
5918  	lkb->lkb_ownpid = (int) current->pid;
5919  
5920  	ua = lkb->lkb_ua;
5921  
5922  	ua->proc = ua_tmp->proc;
5923  	ua->xid = ua_tmp->xid;
5924  	ua->castparam = ua_tmp->castparam;
5925  	ua->castaddr = ua_tmp->castaddr;
5926  	ua->bastparam = ua_tmp->bastparam;
5927  	ua->bastaddr = ua_tmp->bastaddr;
5928  	ua->user_lksb = ua_tmp->user_lksb;
5929  
5930  	/*
5931  	 * The lkb reference from the ls_orphans list was not
5932  	 * removed above, and is now considered the reference
5933  	 * for the proc locks list.
5934  	 */
5935  
5936  	spin_lock_bh(&ua->proc->locks_spin);
5937  	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5938  	spin_unlock_bh(&ua->proc->locks_spin);
5939   out:
5940  	kfree(ua_tmp);
5941  	return rv;
5942  }
5943  
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5944  int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5945  		    uint32_t flags, uint32_t lkid, char *lvb_in)
5946  {
5947  	struct dlm_lkb *lkb;
5948  	struct dlm_args args;
5949  	struct dlm_user_args *ua;
5950  	int error;
5951  
5952  	dlm_lock_recovery(ls);
5953  
5954  	error = find_lkb(ls, lkid, &lkb);
5955  	if (error)
5956  		goto out;
5957  
5958  	trace_dlm_unlock_start(ls, lkb, flags);
5959  
5960  	ua = lkb->lkb_ua;
5961  
5962  	if (lvb_in && ua->lksb.sb_lvbptr)
5963  		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5964  	if (ua_tmp->castparam)
5965  		ua->castparam = ua_tmp->castparam;
5966  	ua->user_lksb = ua_tmp->user_lksb;
5967  
5968  	error = set_unlock_args(flags, ua, &args);
5969  	if (error)
5970  		goto out_put;
5971  
5972  	error = unlock_lock(ls, lkb, &args);
5973  
5974  	if (error == -DLM_EUNLOCK)
5975  		error = 0;
5976  	/* from validate_unlock_args() */
5977  	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5978  		error = 0;
5979  	if (error)
5980  		goto out_put;
5981  
5982  	spin_lock_bh(&ua->proc->locks_spin);
5983  	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5984  	if (!list_empty(&lkb->lkb_ownqueue))
5985  		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5986  	spin_unlock_bh(&ua->proc->locks_spin);
5987   out_put:
5988  	trace_dlm_unlock_end(ls, lkb, flags, error);
5989  	dlm_put_lkb(lkb);
5990   out:
5991  	dlm_unlock_recovery(ls);
5992  	kfree(ua_tmp);
5993  	return error;
5994  }
5995  
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)5996  int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5997  		    uint32_t flags, uint32_t lkid)
5998  {
5999  	struct dlm_lkb *lkb;
6000  	struct dlm_args args;
6001  	struct dlm_user_args *ua;
6002  	int error;
6003  
6004  	dlm_lock_recovery(ls);
6005  
6006  	error = find_lkb(ls, lkid, &lkb);
6007  	if (error)
6008  		goto out;
6009  
6010  	trace_dlm_unlock_start(ls, lkb, flags);
6011  
6012  	ua = lkb->lkb_ua;
6013  	if (ua_tmp->castparam)
6014  		ua->castparam = ua_tmp->castparam;
6015  	ua->user_lksb = ua_tmp->user_lksb;
6016  
6017  	error = set_unlock_args(flags, ua, &args);
6018  	if (error)
6019  		goto out_put;
6020  
6021  	error = cancel_lock(ls, lkb, &args);
6022  
6023  	if (error == -DLM_ECANCEL)
6024  		error = 0;
6025  	/* from validate_unlock_args() */
6026  	if (error == -EBUSY)
6027  		error = 0;
6028   out_put:
6029  	trace_dlm_unlock_end(ls, lkb, flags, error);
6030  	dlm_put_lkb(lkb);
6031   out:
6032  	dlm_unlock_recovery(ls);
6033  	kfree(ua_tmp);
6034  	return error;
6035  }
6036  
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6037  int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6038  {
6039  	struct dlm_lkb *lkb;
6040  	struct dlm_args args;
6041  	struct dlm_user_args *ua;
6042  	struct dlm_rsb *r;
6043  	int error;
6044  
6045  	dlm_lock_recovery(ls);
6046  
6047  	error = find_lkb(ls, lkid, &lkb);
6048  	if (error)
6049  		goto out;
6050  
6051  	trace_dlm_unlock_start(ls, lkb, flags);
6052  
6053  	ua = lkb->lkb_ua;
6054  
6055  	error = set_unlock_args(flags, ua, &args);
6056  	if (error)
6057  		goto out_put;
6058  
6059  	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6060  
6061  	r = lkb->lkb_resource;
6062  	hold_rsb(r);
6063  	lock_rsb(r);
6064  
6065  	error = validate_unlock_args(lkb, &args);
6066  	if (error)
6067  		goto out_r;
6068  	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6069  
6070  	error = _cancel_lock(r, lkb);
6071   out_r:
6072  	unlock_rsb(r);
6073  	put_rsb(r);
6074  
6075  	if (error == -DLM_ECANCEL)
6076  		error = 0;
6077  	/* from validate_unlock_args() */
6078  	if (error == -EBUSY)
6079  		error = 0;
6080   out_put:
6081  	trace_dlm_unlock_end(ls, lkb, flags, error);
6082  	dlm_put_lkb(lkb);
6083   out:
6084  	dlm_unlock_recovery(ls);
6085  	return error;
6086  }
6087  
6088  /* lkb's that are removed from the waiters list by revert are just left on the
6089     orphans list with the granted orphan locks, to be freed by purge */
6090  
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6091  static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6092  {
6093  	struct dlm_args args;
6094  	int error;
6095  
6096  	hold_lkb(lkb); /* reference for the ls_orphans list */
6097  	spin_lock_bh(&ls->ls_orphans_lock);
6098  	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6099  	spin_unlock_bh(&ls->ls_orphans_lock);
6100  
6101  	set_unlock_args(0, lkb->lkb_ua, &args);
6102  
6103  	error = cancel_lock(ls, lkb, &args);
6104  	if (error == -DLM_ECANCEL)
6105  		error = 0;
6106  	return error;
6107  }
6108  
6109  /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6110     granted.  Regardless of what rsb queue the lock is on, it's removed and
6111     freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6112     if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6113  
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6114  static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115  {
6116  	struct dlm_args args;
6117  	int error;
6118  
6119  	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6120  			lkb->lkb_ua, &args);
6121  
6122  	error = unlock_lock(ls, lkb, &args);
6123  	if (error == -DLM_EUNLOCK)
6124  		error = 0;
6125  	return error;
6126  }
6127  
6128  /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6129     (which does lock_rsb) due to deadlock with receiving a message that does
6130     lock_rsb followed by dlm_user_add_cb() */
6131  
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6132  static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6133  				     struct dlm_user_proc *proc)
6134  {
6135  	struct dlm_lkb *lkb = NULL;
6136  
6137  	spin_lock_bh(&ls->ls_clear_proc_locks);
6138  	if (list_empty(&proc->locks))
6139  		goto out;
6140  
6141  	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6142  	list_del_init(&lkb->lkb_ownqueue);
6143  
6144  	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6145  		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6146  	else
6147  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6148   out:
6149  	spin_unlock_bh(&ls->ls_clear_proc_locks);
6150  	return lkb;
6151  }
6152  
6153  /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6154     1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6155     which we clear here. */
6156  
6157  /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6158     list, and no more device_writes should add lkb's to proc->locks list; so we
6159     shouldn't need to take asts_spin or locks_spin here.  this assumes that
6160     device reads/writes/closes are serialized -- FIXME: we may need to serialize
6161     them ourself. */
6162  
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6163  void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6164  {
6165  	struct dlm_callback *cb, *cb_safe;
6166  	struct dlm_lkb *lkb, *safe;
6167  
6168  	dlm_lock_recovery(ls);
6169  
6170  	while (1) {
6171  		lkb = del_proc_lock(ls, proc);
6172  		if (!lkb)
6173  			break;
6174  		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6175  			orphan_proc_lock(ls, lkb);
6176  		else
6177  			unlock_proc_lock(ls, lkb);
6178  
6179  		/* this removes the reference for the proc->locks list
6180  		   added by dlm_user_request, it may result in the lkb
6181  		   being freed */
6182  
6183  		dlm_put_lkb(lkb);
6184  	}
6185  
6186  	spin_lock_bh(&ls->ls_clear_proc_locks);
6187  
6188  	/* in-progress unlocks */
6189  	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6190  		list_del_init(&lkb->lkb_ownqueue);
6191  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6192  		dlm_put_lkb(lkb);
6193  	}
6194  
6195  	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6196  		list_del(&cb->list);
6197  		dlm_free_cb(cb);
6198  	}
6199  
6200  	spin_unlock_bh(&ls->ls_clear_proc_locks);
6201  	dlm_unlock_recovery(ls);
6202  }
6203  
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6204  static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6205  {
6206  	struct dlm_callback *cb, *cb_safe;
6207  	struct dlm_lkb *lkb, *safe;
6208  
6209  	while (1) {
6210  		lkb = NULL;
6211  		spin_lock_bh(&proc->locks_spin);
6212  		if (!list_empty(&proc->locks)) {
6213  			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6214  					 lkb_ownqueue);
6215  			list_del_init(&lkb->lkb_ownqueue);
6216  		}
6217  		spin_unlock_bh(&proc->locks_spin);
6218  
6219  		if (!lkb)
6220  			break;
6221  
6222  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6223  		unlock_proc_lock(ls, lkb);
6224  		dlm_put_lkb(lkb); /* ref from proc->locks list */
6225  	}
6226  
6227  	spin_lock_bh(&proc->locks_spin);
6228  	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6229  		list_del_init(&lkb->lkb_ownqueue);
6230  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6231  		dlm_put_lkb(lkb);
6232  	}
6233  	spin_unlock_bh(&proc->locks_spin);
6234  
6235  	spin_lock_bh(&proc->asts_spin);
6236  	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6237  		list_del(&cb->list);
6238  		dlm_free_cb(cb);
6239  	}
6240  	spin_unlock_bh(&proc->asts_spin);
6241  }
6242  
6243  /* pid of 0 means purge all orphans */
6244  
do_purge(struct dlm_ls * ls,int nodeid,int pid)6245  static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6246  {
6247  	struct dlm_lkb *lkb, *safe;
6248  
6249  	spin_lock_bh(&ls->ls_orphans_lock);
6250  	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6251  		if (pid && lkb->lkb_ownpid != pid)
6252  			continue;
6253  		unlock_proc_lock(ls, lkb);
6254  		list_del_init(&lkb->lkb_ownqueue);
6255  		dlm_put_lkb(lkb);
6256  	}
6257  	spin_unlock_bh(&ls->ls_orphans_lock);
6258  }
6259  
send_purge(struct dlm_ls * ls,int nodeid,int pid)6260  static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6261  {
6262  	struct dlm_message *ms;
6263  	struct dlm_mhandle *mh;
6264  	int error;
6265  
6266  	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6267  				DLM_MSG_PURGE, &ms, &mh);
6268  	if (error)
6269  		return error;
6270  	ms->m_nodeid = cpu_to_le32(nodeid);
6271  	ms->m_pid = cpu_to_le32(pid);
6272  
6273  	return send_message(mh, ms, NULL, 0);
6274  }
6275  
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6276  int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6277  		   int nodeid, int pid)
6278  {
6279  	int error = 0;
6280  
6281  	if (nodeid && (nodeid != dlm_our_nodeid())) {
6282  		error = send_purge(ls, nodeid, pid);
6283  	} else {
6284  		dlm_lock_recovery(ls);
6285  		if (pid == current->pid)
6286  			purge_proc_locks(ls, proc);
6287  		else
6288  			do_purge(ls, nodeid, pid);
6289  		dlm_unlock_recovery(ls);
6290  	}
6291  	return error;
6292  }
6293  
6294  /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6295  int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6296  		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6297  {
6298  	struct dlm_lksb *lksb;
6299  	struct dlm_lkb *lkb;
6300  	struct dlm_rsb *r;
6301  	int error;
6302  
6303  	/* we currently can't set a valid user lock */
6304  	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6305  		return -EOPNOTSUPP;
6306  
6307  	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6308  	if (!lksb)
6309  		return -ENOMEM;
6310  
6311  	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6312  	if (error) {
6313  		kfree(lksb);
6314  		return error;
6315  	}
6316  
6317  	dlm_set_dflags_val(lkb, lkb_dflags);
6318  	lkb->lkb_nodeid = lkb_nodeid;
6319  	lkb->lkb_lksb = lksb;
6320  	/* user specific pointer, just don't have it NULL for kernel locks */
6321  	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6322  		lkb->lkb_astparam = (void *)0xDEADBEEF;
6323  
6324  	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6325  	if (error) {
6326  		kfree(lksb);
6327  		__put_lkb(ls, lkb);
6328  		return error;
6329  	}
6330  
6331  	lock_rsb(r);
6332  	attach_lkb(r, lkb);
6333  	add_lkb(r, lkb, lkb_status);
6334  	unlock_rsb(r);
6335  	put_rsb(r);
6336  
6337  	return 0;
6338  }
6339  
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6340  int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6341  				 int mstype, int to_nodeid)
6342  {
6343  	struct dlm_lkb *lkb;
6344  	int error;
6345  
6346  	error = find_lkb(ls, lkb_id, &lkb);
6347  	if (error)
6348  		return error;
6349  
6350  	add_to_waiters(lkb, mstype, to_nodeid);
6351  	dlm_put_lkb(lkb);
6352  	return 0;
6353  }
6354  
6355