xref: /linux/fs/dlm/lock.c (revision 94f2dab24ee87b4322a7c9e60959391717368e5e)
1  // SPDX-License-Identifier: GPL-2.0-only
2  /******************************************************************************
3  *******************************************************************************
4  **
5  **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6  **
7  **
8  *******************************************************************************
9  ******************************************************************************/
10  
11  /* Central locking logic has four stages:
12  
13     dlm_lock()
14     dlm_unlock()
15  
16     request_lock(ls, lkb)
17     convert_lock(ls, lkb)
18     unlock_lock(ls, lkb)
19     cancel_lock(ls, lkb)
20  
21     _request_lock(r, lkb)
22     _convert_lock(r, lkb)
23     _unlock_lock(r, lkb)
24     _cancel_lock(r, lkb)
25  
26     do_request(r, lkb)
27     do_convert(r, lkb)
28     do_unlock(r, lkb)
29     do_cancel(r, lkb)
30  
31     Stage 1 (lock, unlock) is mainly about checking input args and
32     splitting into one of the four main operations:
33  
34         dlm_lock          = request_lock
35         dlm_lock+CONVERT  = convert_lock
36         dlm_unlock        = unlock_lock
37         dlm_unlock+CANCEL = cancel_lock
38  
39     Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40     provided to the next stage.
41  
42     Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43     When remote, it calls send_xxxx(), when local it calls do_xxxx().
44  
45     Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46     given rsb and lkb and queues callbacks.
47  
48     For remote operations, send_xxxx() results in the corresponding do_xxxx()
49     function being executed on the remote node.  The connecting send/receive
50     calls on local (L) and remote (R) nodes:
51  
52     L: send_xxxx()              ->  R: receive_xxxx()
53                                     R: do_xxxx()
54     L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55  */
56  #include <trace/events/dlm.h>
57  
58  #include <linux/types.h>
59  #include <linux/rbtree.h>
60  #include <linux/slab.h>
61  #include "dlm_internal.h"
62  #include <linux/dlm_device.h>
63  #include "memory.h"
64  #include "midcomms.h"
65  #include "requestqueue.h"
66  #include "util.h"
67  #include "dir.h"
68  #include "member.h"
69  #include "lockspace.h"
70  #include "ast.h"
71  #include "lock.h"
72  #include "rcom.h"
73  #include "recover.h"
74  #include "lvb_table.h"
75  #include "user.h"
76  #include "config.h"
77  
78  static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79  static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80  static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81  static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82  static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83  static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84  static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85  static int send_remove(struct dlm_rsb *r);
86  static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87  static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88  static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89  				    const struct dlm_message *ms, bool local);
90  static int receive_extralen(const struct dlm_message *ms);
91  static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92  static void toss_rsb(struct kref *kref);
93  
94  /*
95   * Lock compatibilty matrix - thanks Steve
96   * UN = Unlocked state. Not really a state, used as a flag
97   * PD = Padding. Used to make the matrix a nice power of two in size
98   * Other states are the same as the VMS DLM.
99   * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100   */
101  
102  static const int __dlm_compat_matrix[8][8] = {
103        /* UN NL CR CW PR PW EX PD */
104          {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105          {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106          {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107          {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108          {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109          {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110          {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111          {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112  };
113  
114  /*
115   * This defines the direction of transfer of LVB data.
116   * Granted mode is the row; requested mode is the column.
117   * Usage: matrix[grmode+1][rqmode+1]
118   * 1 = LVB is returned to the caller
119   * 0 = LVB is written to the resource
120   * -1 = nothing happens to the LVB
121   */
122  
123  const int dlm_lvb_operations[8][8] = {
124          /* UN   NL  CR  CW  PR  PW  EX  PD*/
125          {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126          {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127          {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128          {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129          {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130          {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131          {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132          {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133  };
134  
135  #define modes_compat(gr, rq) \
136  	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137  
138  int dlm_modes_compat(int mode1, int mode2)
139  {
140  	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141  }
142  
143  /*
144   * Compatibility matrix for conversions with QUECVT set.
145   * Granted mode is the row; requested mode is the column.
146   * Usage: matrix[grmode+1][rqmode+1]
147   */
148  
149  static const int __quecvt_compat_matrix[8][8] = {
150        /* UN NL CR CW PR PW EX PD */
151          {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152          {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153          {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154          {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155          {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156          {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157          {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158          {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159  };
160  
161  void dlm_print_lkb(struct dlm_lkb *lkb)
162  {
163  	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164  	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165  	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166  	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167  	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168  	       (unsigned long long)lkb->lkb_recover_seq);
169  }
170  
171  static void dlm_print_rsb(struct dlm_rsb *r)
172  {
173  	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174  	       "rlc %d name %s\n",
175  	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176  	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177  	       r->res_name);
178  }
179  
180  void dlm_dump_rsb(struct dlm_rsb *r)
181  {
182  	struct dlm_lkb *lkb;
183  
184  	dlm_print_rsb(r);
185  
186  	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187  	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188  	printk(KERN_ERR "rsb lookup list\n");
189  	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190  		dlm_print_lkb(lkb);
191  	printk(KERN_ERR "rsb grant queue:\n");
192  	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193  		dlm_print_lkb(lkb);
194  	printk(KERN_ERR "rsb convert queue:\n");
195  	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196  		dlm_print_lkb(lkb);
197  	printk(KERN_ERR "rsb wait queue:\n");
198  	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199  		dlm_print_lkb(lkb);
200  }
201  
202  /* Threads cannot use the lockspace while it's being recovered */
203  
204  static inline void dlm_lock_recovery(struct dlm_ls *ls)
205  {
206  	down_read(&ls->ls_in_recovery);
207  }
208  
209  void dlm_unlock_recovery(struct dlm_ls *ls)
210  {
211  	up_read(&ls->ls_in_recovery);
212  }
213  
214  int dlm_lock_recovery_try(struct dlm_ls *ls)
215  {
216  	return down_read_trylock(&ls->ls_in_recovery);
217  }
218  
219  static inline int can_be_queued(struct dlm_lkb *lkb)
220  {
221  	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222  }
223  
224  static inline int force_blocking_asts(struct dlm_lkb *lkb)
225  {
226  	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227  }
228  
229  static inline int is_demoted(struct dlm_lkb *lkb)
230  {
231  	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232  }
233  
234  static inline int is_altmode(struct dlm_lkb *lkb)
235  {
236  	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237  }
238  
239  static inline int is_granted(struct dlm_lkb *lkb)
240  {
241  	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242  }
243  
244  static inline int is_remote(struct dlm_rsb *r)
245  {
246  	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247  	return !!r->res_nodeid;
248  }
249  
250  static inline int is_process_copy(struct dlm_lkb *lkb)
251  {
252  	return lkb->lkb_nodeid &&
253  	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254  }
255  
256  static inline int is_master_copy(struct dlm_lkb *lkb)
257  {
258  	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259  }
260  
261  static inline int middle_conversion(struct dlm_lkb *lkb)
262  {
263  	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264  	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265  		return 1;
266  	return 0;
267  }
268  
269  static inline int down_conversion(struct dlm_lkb *lkb)
270  {
271  	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272  }
273  
274  static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275  {
276  	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277  }
278  
279  static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280  {
281  	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282  }
283  
284  static inline int is_overlap(struct dlm_lkb *lkb)
285  {
286  	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287  	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288  }
289  
290  static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291  {
292  	if (is_master_copy(lkb))
293  		return;
294  
295  	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296  
297  	if (rv == -DLM_ECANCEL &&
298  	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299  		rv = -EDEADLK;
300  
301  	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302  }
303  
304  static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305  {
306  	queue_cast(r, lkb,
307  		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308  }
309  
310  static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311  {
312  	if (is_master_copy(lkb)) {
313  		send_bast(r, lkb, rqmode);
314  	} else {
315  		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316  	}
317  }
318  
319  /*
320   * Basic operations on rsb's and lkb's
321   */
322  
323  /* This is only called to add a reference when the code already holds
324     a valid reference to the rsb, so there's no need for locking. */
325  
326  static inline void hold_rsb(struct dlm_rsb *r)
327  {
328  	kref_get(&r->res_ref);
329  }
330  
331  void dlm_hold_rsb(struct dlm_rsb *r)
332  {
333  	hold_rsb(r);
334  }
335  
336  /* When all references to the rsb are gone it's transferred to
337     the tossed list for later disposal. */
338  
339  static void put_rsb(struct dlm_rsb *r)
340  {
341  	struct dlm_ls *ls = r->res_ls;
342  	uint32_t bucket = r->res_bucket;
343  	int rv;
344  
345  	rv = kref_put_lock(&r->res_ref, toss_rsb,
346  			   &ls->ls_rsbtbl[bucket].lock);
347  	if (rv)
348  		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
349  }
350  
351  void dlm_put_rsb(struct dlm_rsb *r)
352  {
353  	put_rsb(r);
354  }
355  
356  static int pre_rsb_struct(struct dlm_ls *ls)
357  {
358  	struct dlm_rsb *r1, *r2;
359  	int count = 0;
360  
361  	spin_lock(&ls->ls_new_rsb_spin);
362  	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
363  		spin_unlock(&ls->ls_new_rsb_spin);
364  		return 0;
365  	}
366  	spin_unlock(&ls->ls_new_rsb_spin);
367  
368  	r1 = dlm_allocate_rsb(ls);
369  	r2 = dlm_allocate_rsb(ls);
370  
371  	spin_lock(&ls->ls_new_rsb_spin);
372  	if (r1) {
373  		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
374  		ls->ls_new_rsb_count++;
375  	}
376  	if (r2) {
377  		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
378  		ls->ls_new_rsb_count++;
379  	}
380  	count = ls->ls_new_rsb_count;
381  	spin_unlock(&ls->ls_new_rsb_spin);
382  
383  	if (!count)
384  		return -ENOMEM;
385  	return 0;
386  }
387  
388  /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
389     unlock any spinlocks, go back and call pre_rsb_struct again.
390     Otherwise, take an rsb off the list and return it. */
391  
392  static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
393  			  struct dlm_rsb **r_ret)
394  {
395  	struct dlm_rsb *r;
396  	int count;
397  
398  	spin_lock(&ls->ls_new_rsb_spin);
399  	if (list_empty(&ls->ls_new_rsb)) {
400  		count = ls->ls_new_rsb_count;
401  		spin_unlock(&ls->ls_new_rsb_spin);
402  		log_debug(ls, "find_rsb retry %d %d %s",
403  			  count, dlm_config.ci_new_rsb_count,
404  			  (const char *)name);
405  		return -EAGAIN;
406  	}
407  
408  	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
409  	list_del(&r->res_hashchain);
410  	/* Convert the empty list_head to a NULL rb_node for tree usage: */
411  	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
412  	ls->ls_new_rsb_count--;
413  	spin_unlock(&ls->ls_new_rsb_spin);
414  
415  	r->res_ls = ls;
416  	r->res_length = len;
417  	memcpy(r->res_name, name, len);
418  	mutex_init(&r->res_mutex);
419  
420  	INIT_LIST_HEAD(&r->res_lookup);
421  	INIT_LIST_HEAD(&r->res_grantqueue);
422  	INIT_LIST_HEAD(&r->res_convertqueue);
423  	INIT_LIST_HEAD(&r->res_waitqueue);
424  	INIT_LIST_HEAD(&r->res_root_list);
425  	INIT_LIST_HEAD(&r->res_recover_list);
426  
427  	*r_ret = r;
428  	return 0;
429  }
430  
431  static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
432  {
433  	char maxname[DLM_RESNAME_MAXLEN];
434  
435  	memset(maxname, 0, DLM_RESNAME_MAXLEN);
436  	memcpy(maxname, name, nlen);
437  	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
438  }
439  
440  int dlm_search_rsb_tree(struct rb_root *tree, const void *name, int len,
441  			struct dlm_rsb **r_ret)
442  {
443  	struct rb_node *node = tree->rb_node;
444  	struct dlm_rsb *r;
445  	int rc;
446  
447  	while (node) {
448  		r = rb_entry(node, struct dlm_rsb, res_hashnode);
449  		rc = rsb_cmp(r, name, len);
450  		if (rc < 0)
451  			node = node->rb_left;
452  		else if (rc > 0)
453  			node = node->rb_right;
454  		else
455  			goto found;
456  	}
457  	*r_ret = NULL;
458  	return -EBADR;
459  
460   found:
461  	*r_ret = r;
462  	return 0;
463  }
464  
465  static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
466  {
467  	struct rb_node **newn = &tree->rb_node;
468  	struct rb_node *parent = NULL;
469  	int rc;
470  
471  	while (*newn) {
472  		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
473  					       res_hashnode);
474  
475  		parent = *newn;
476  		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
477  		if (rc < 0)
478  			newn = &parent->rb_left;
479  		else if (rc > 0)
480  			newn = &parent->rb_right;
481  		else {
482  			log_print("rsb_insert match");
483  			dlm_dump_rsb(rsb);
484  			dlm_dump_rsb(cur);
485  			return -EEXIST;
486  		}
487  	}
488  
489  	rb_link_node(&rsb->res_hashnode, parent, newn);
490  	rb_insert_color(&rsb->res_hashnode, tree);
491  	return 0;
492  }
493  
494  /*
495   * Find rsb in rsbtbl and potentially create/add one
496   *
497   * Delaying the release of rsb's has a similar benefit to applications keeping
498   * NL locks on an rsb, but without the guarantee that the cached master value
499   * will still be valid when the rsb is reused.  Apps aren't always smart enough
500   * to keep NL locks on an rsb that they may lock again shortly; this can lead
501   * to excessive master lookups and removals if we don't delay the release.
502   *
503   * Searching for an rsb means looking through both the normal list and toss
504   * list.  When found on the toss list the rsb is moved to the normal list with
505   * ref count of 1; when found on normal list the ref count is incremented.
506   *
507   * rsb's on the keep list are being used locally and refcounted.
508   * rsb's on the toss list are not being used locally, and are not refcounted.
509   *
510   * The toss list rsb's were either
511   * - previously used locally but not any more (were on keep list, then
512   *   moved to toss list when last refcount dropped)
513   * - created and put on toss list as a directory record for a lookup
514   *   (we are the dir node for the res, but are not using the res right now,
515   *   but some other node is)
516   *
517   * The purpose of find_rsb() is to return a refcounted rsb for local use.
518   * So, if the given rsb is on the toss list, it is moved to the keep list
519   * before being returned.
520   *
521   * toss_rsb() happens when all local usage of the rsb is done, i.e. no
522   * more refcounts exist, so the rsb is moved from the keep list to the
523   * toss list.
524   *
525   * rsb's on both keep and toss lists are used for doing a name to master
526   * lookups.  rsb's that are in use locally (and being refcounted) are on
527   * the keep list, rsb's that are not in use locally (not refcounted) and
528   * only exist for name/master lookups are on the toss list.
529   *
530   * rsb's on the toss list who's dir_nodeid is not local can have stale
531   * name/master mappings.  So, remote requests on such rsb's can potentially
532   * return with an error, which means the mapping is stale and needs to
533   * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
534   * first_lkid is to keep only a single outstanding request on an rsb
535   * while that rsb has a potentially stale master.)
536   */
537  
538  static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
539  			uint32_t hash, uint32_t b,
540  			int dir_nodeid, int from_nodeid,
541  			unsigned int flags, struct dlm_rsb **r_ret)
542  {
543  	struct dlm_rsb *r = NULL;
544  	int our_nodeid = dlm_our_nodeid();
545  	int from_local = 0;
546  	int from_other = 0;
547  	int from_dir = 0;
548  	int create = 0;
549  	int error;
550  
551  	if (flags & R_RECEIVE_REQUEST) {
552  		if (from_nodeid == dir_nodeid)
553  			from_dir = 1;
554  		else
555  			from_other = 1;
556  	} else if (flags & R_REQUEST) {
557  		from_local = 1;
558  	}
559  
560  	/*
561  	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
562  	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
563  	 * we're the new master.  Our local recovery may not have set
564  	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
565  	 * create the rsb; dlm_recover_process_copy() will handle EBADR
566  	 * by resending.
567  	 *
568  	 * If someone sends us a request, we are the dir node, and we do
569  	 * not find the rsb anywhere, then recreate it.  This happens if
570  	 * someone sends us a request after we have removed/freed an rsb
571  	 * from our toss list.  (They sent a request instead of lookup
572  	 * because they are using an rsb from their toss list.)
573  	 */
574  
575  	if (from_local || from_dir ||
576  	    (from_other && (dir_nodeid == our_nodeid))) {
577  		create = 1;
578  	}
579  
580   retry:
581  	if (create) {
582  		error = pre_rsb_struct(ls);
583  		if (error < 0)
584  			goto out;
585  	}
586  
587  	spin_lock(&ls->ls_rsbtbl[b].lock);
588  
589  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
590  	if (error)
591  		goto do_toss;
592  
593  	/*
594  	 * rsb is active, so we can't check master_nodeid without lock_rsb.
595  	 */
596  
597  	kref_get(&r->res_ref);
598  	goto out_unlock;
599  
600  
601   do_toss:
602  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
603  	if (error)
604  		goto do_new;
605  
606  	/*
607  	 * rsb found inactive (master_nodeid may be out of date unless
608  	 * we are the dir_nodeid or were the master)  No other thread
609  	 * is using this rsb because it's on the toss list, so we can
610  	 * look at or update res_master_nodeid without lock_rsb.
611  	 */
612  
613  	if ((r->res_master_nodeid != our_nodeid) && from_other) {
614  		/* our rsb was not master, and another node (not the dir node)
615  		   has sent us a request */
616  		log_debug(ls, "find_rsb toss from_other %d master %d dir %d %s",
617  			  from_nodeid, r->res_master_nodeid, dir_nodeid,
618  			  r->res_name);
619  		error = -ENOTBLK;
620  		goto out_unlock;
621  	}
622  
623  	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
624  		/* don't think this should ever happen */
625  		log_error(ls, "find_rsb toss from_dir %d master %d",
626  			  from_nodeid, r->res_master_nodeid);
627  		dlm_print_rsb(r);
628  		/* fix it and go on */
629  		r->res_master_nodeid = our_nodeid;
630  		r->res_nodeid = 0;
631  		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
632  		r->res_first_lkid = 0;
633  	}
634  
635  	if (from_local && (r->res_master_nodeid != our_nodeid)) {
636  		/* Because we have held no locks on this rsb,
637  		   res_master_nodeid could have become stale. */
638  		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
639  		r->res_first_lkid = 0;
640  	}
641  
642  	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
643  	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
644  	goto out_unlock;
645  
646  
647   do_new:
648  	/*
649  	 * rsb not found
650  	 */
651  
652  	if (error == -EBADR && !create)
653  		goto out_unlock;
654  
655  	error = get_rsb_struct(ls, name, len, &r);
656  	if (error == -EAGAIN) {
657  		spin_unlock(&ls->ls_rsbtbl[b].lock);
658  		goto retry;
659  	}
660  	if (error)
661  		goto out_unlock;
662  
663  	r->res_hash = hash;
664  	r->res_bucket = b;
665  	r->res_dir_nodeid = dir_nodeid;
666  	kref_init(&r->res_ref);
667  
668  	if (from_dir) {
669  		/* want to see how often this happens */
670  		log_debug(ls, "find_rsb new from_dir %d recreate %s",
671  			  from_nodeid, r->res_name);
672  		r->res_master_nodeid = our_nodeid;
673  		r->res_nodeid = 0;
674  		goto out_add;
675  	}
676  
677  	if (from_other && (dir_nodeid != our_nodeid)) {
678  		/* should never happen */
679  		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
680  			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
681  		dlm_free_rsb(r);
682  		r = NULL;
683  		error = -ENOTBLK;
684  		goto out_unlock;
685  	}
686  
687  	if (from_other) {
688  		log_debug(ls, "find_rsb new from_other %d dir %d %s",
689  			  from_nodeid, dir_nodeid, r->res_name);
690  	}
691  
692  	if (dir_nodeid == our_nodeid) {
693  		/* When we are the dir nodeid, we can set the master
694  		   node immediately */
695  		r->res_master_nodeid = our_nodeid;
696  		r->res_nodeid = 0;
697  	} else {
698  		/* set_master will send_lookup to dir_nodeid */
699  		r->res_master_nodeid = 0;
700  		r->res_nodeid = -1;
701  	}
702  
703   out_add:
704  	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
705   out_unlock:
706  	spin_unlock(&ls->ls_rsbtbl[b].lock);
707   out:
708  	*r_ret = r;
709  	return error;
710  }
711  
712  /* During recovery, other nodes can send us new MSTCPY locks (from
713     dlm_recover_locks) before we've made ourself master (in
714     dlm_recover_masters). */
715  
716  static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
717  			  uint32_t hash, uint32_t b,
718  			  int dir_nodeid, int from_nodeid,
719  			  unsigned int flags, struct dlm_rsb **r_ret)
720  {
721  	struct dlm_rsb *r = NULL;
722  	int our_nodeid = dlm_our_nodeid();
723  	int recover = (flags & R_RECEIVE_RECOVER);
724  	int error;
725  
726   retry:
727  	error = pre_rsb_struct(ls);
728  	if (error < 0)
729  		goto out;
730  
731  	spin_lock(&ls->ls_rsbtbl[b].lock);
732  
733  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
734  	if (error)
735  		goto do_toss;
736  
737  	/*
738  	 * rsb is active, so we can't check master_nodeid without lock_rsb.
739  	 */
740  
741  	kref_get(&r->res_ref);
742  	goto out_unlock;
743  
744  
745   do_toss:
746  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
747  	if (error)
748  		goto do_new;
749  
750  	/*
751  	 * rsb found inactive. No other thread is using this rsb because
752  	 * it's on the toss list, so we can look at or update
753  	 * res_master_nodeid without lock_rsb.
754  	 */
755  
756  	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
757  		/* our rsb is not master, and another node has sent us a
758  		   request; this should never happen */
759  		log_error(ls, "find_rsb toss from_nodeid %d master %d dir %d",
760  			  from_nodeid, r->res_master_nodeid, dir_nodeid);
761  		dlm_print_rsb(r);
762  		error = -ENOTBLK;
763  		goto out_unlock;
764  	}
765  
766  	if (!recover && (r->res_master_nodeid != our_nodeid) &&
767  	    (dir_nodeid == our_nodeid)) {
768  		/* our rsb is not master, and we are dir; may as well fix it;
769  		   this should never happen */
770  		log_error(ls, "find_rsb toss our %d master %d dir %d",
771  			  our_nodeid, r->res_master_nodeid, dir_nodeid);
772  		dlm_print_rsb(r);
773  		r->res_master_nodeid = our_nodeid;
774  		r->res_nodeid = 0;
775  	}
776  
777  	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
778  	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
779  	goto out_unlock;
780  
781  
782   do_new:
783  	/*
784  	 * rsb not found
785  	 */
786  
787  	error = get_rsb_struct(ls, name, len, &r);
788  	if (error == -EAGAIN) {
789  		spin_unlock(&ls->ls_rsbtbl[b].lock);
790  		goto retry;
791  	}
792  	if (error)
793  		goto out_unlock;
794  
795  	r->res_hash = hash;
796  	r->res_bucket = b;
797  	r->res_dir_nodeid = dir_nodeid;
798  	r->res_master_nodeid = dir_nodeid;
799  	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
800  	kref_init(&r->res_ref);
801  
802  	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
803   out_unlock:
804  	spin_unlock(&ls->ls_rsbtbl[b].lock);
805   out:
806  	*r_ret = r;
807  	return error;
808  }
809  
810  static int find_rsb(struct dlm_ls *ls, const void *name, int len,
811  		    int from_nodeid, unsigned int flags,
812  		    struct dlm_rsb **r_ret)
813  {
814  	uint32_t hash, b;
815  	int dir_nodeid;
816  
817  	if (len > DLM_RESNAME_MAXLEN)
818  		return -EINVAL;
819  
820  	hash = jhash(name, len, 0);
821  	b = hash & (ls->ls_rsbtbl_size - 1);
822  
823  	dir_nodeid = dlm_hash2nodeid(ls, hash);
824  
825  	if (dlm_no_directory(ls))
826  		return find_rsb_nodir(ls, name, len, hash, b, dir_nodeid,
827  				      from_nodeid, flags, r_ret);
828  	else
829  		return find_rsb_dir(ls, name, len, hash, b, dir_nodeid,
830  				      from_nodeid, flags, r_ret);
831  }
832  
833  /* we have received a request and found that res_master_nodeid != our_nodeid,
834     so we need to return an error or make ourself the master */
835  
836  static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
837  				  int from_nodeid)
838  {
839  	if (dlm_no_directory(ls)) {
840  		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
841  			  from_nodeid, r->res_master_nodeid,
842  			  r->res_dir_nodeid);
843  		dlm_print_rsb(r);
844  		return -ENOTBLK;
845  	}
846  
847  	if (from_nodeid != r->res_dir_nodeid) {
848  		/* our rsb is not master, and another node (not the dir node)
849  	   	   has sent us a request.  this is much more common when our
850  	   	   master_nodeid is zero, so limit debug to non-zero.  */
851  
852  		if (r->res_master_nodeid) {
853  			log_debug(ls, "validate master from_other %d master %d "
854  				  "dir %d first %x %s", from_nodeid,
855  				  r->res_master_nodeid, r->res_dir_nodeid,
856  				  r->res_first_lkid, r->res_name);
857  		}
858  		return -ENOTBLK;
859  	} else {
860  		/* our rsb is not master, but the dir nodeid has sent us a
861  	   	   request; this could happen with master 0 / res_nodeid -1 */
862  
863  		if (r->res_master_nodeid) {
864  			log_error(ls, "validate master from_dir %d master %d "
865  				  "first %x %s",
866  				  from_nodeid, r->res_master_nodeid,
867  				  r->res_first_lkid, r->res_name);
868  		}
869  
870  		r->res_master_nodeid = dlm_our_nodeid();
871  		r->res_nodeid = 0;
872  		return 0;
873  	}
874  }
875  
876  static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
877  				int from_nodeid, bool toss_list, unsigned int flags,
878  				int *r_nodeid, int *result)
879  {
880  	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
881  	int from_master = (flags & DLM_LU_RECOVER_DIR);
882  
883  	if (r->res_dir_nodeid != our_nodeid) {
884  		/* should not happen, but may as well fix it and carry on */
885  		log_error(ls, "%s res_dir %d our %d %s", __func__,
886  			  r->res_dir_nodeid, our_nodeid, r->res_name);
887  		r->res_dir_nodeid = our_nodeid;
888  	}
889  
890  	if (fix_master && dlm_is_removed(ls, r->res_master_nodeid)) {
891  		/* Recovery uses this function to set a new master when
892  		 * the previous master failed.  Setting NEW_MASTER will
893  		 * force dlm_recover_masters to call recover_master on this
894  		 * rsb even though the res_nodeid is no longer removed.
895  		 */
896  
897  		r->res_master_nodeid = from_nodeid;
898  		r->res_nodeid = from_nodeid;
899  		rsb_set_flag(r, RSB_NEW_MASTER);
900  
901  		if (toss_list) {
902  			/* I don't think we should ever find it on toss list. */
903  			log_error(ls, "%s fix_master on toss", __func__);
904  			dlm_dump_rsb(r);
905  		}
906  	}
907  
908  	if (from_master && (r->res_master_nodeid != from_nodeid)) {
909  		/* this will happen if from_nodeid became master during
910  		 * a previous recovery cycle, and we aborted the previous
911  		 * cycle before recovering this master value
912  		 */
913  
914  		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
915  			  __func__, from_nodeid, r->res_master_nodeid,
916  			  r->res_nodeid, r->res_first_lkid, r->res_name);
917  
918  		if (r->res_master_nodeid == our_nodeid) {
919  			log_error(ls, "from_master %d our_master", from_nodeid);
920  			dlm_dump_rsb(r);
921  			goto ret_assign;
922  		}
923  
924  		r->res_master_nodeid = from_nodeid;
925  		r->res_nodeid = from_nodeid;
926  		rsb_set_flag(r, RSB_NEW_MASTER);
927  	}
928  
929  	if (!r->res_master_nodeid) {
930  		/* this will happen if recovery happens while we're looking
931  		 * up the master for this rsb
932  		 */
933  
934  		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
935  			  from_nodeid, r->res_first_lkid, r->res_name);
936  		r->res_master_nodeid = from_nodeid;
937  		r->res_nodeid = from_nodeid;
938  	}
939  
940  	if (!from_master && !fix_master &&
941  	    (r->res_master_nodeid == from_nodeid)) {
942  		/* this can happen when the master sends remove, the dir node
943  		 * finds the rsb on the keep list and ignores the remove,
944  		 * and the former master sends a lookup
945  		 */
946  
947  		log_limit(ls, "%s from master %d flags %x first %x %s",
948  			  __func__, from_nodeid, flags, r->res_first_lkid,
949  			  r->res_name);
950  	}
951  
952   ret_assign:
953  	*r_nodeid = r->res_master_nodeid;
954  	if (result)
955  		*result = DLM_LU_MATCH;
956  }
957  
958  /*
959   * We're the dir node for this res and another node wants to know the
960   * master nodeid.  During normal operation (non recovery) this is only
961   * called from receive_lookup(); master lookups when the local node is
962   * the dir node are done by find_rsb().
963   *
964   * normal operation, we are the dir node for a resource
965   * . _request_lock
966   * . set_master
967   * . send_lookup
968   * . receive_lookup
969   * . dlm_master_lookup flags 0
970   *
971   * recover directory, we are rebuilding dir for all resources
972   * . dlm_recover_directory
973   * . dlm_rcom_names
974   *   remote node sends back the rsb names it is master of and we are dir of
975   * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
976   *   we either create new rsb setting remote node as master, or find existing
977   *   rsb and set master to be the remote node.
978   *
979   * recover masters, we are finding the new master for resources
980   * . dlm_recover_masters
981   * . recover_master
982   * . dlm_send_rcom_lookup
983   * . receive_rcom_lookup
984   * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
985   */
986  
987  int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
988  		      int len, unsigned int flags, int *r_nodeid, int *result)
989  {
990  	struct dlm_rsb *r = NULL;
991  	uint32_t hash, b;
992  	int our_nodeid = dlm_our_nodeid();
993  	int dir_nodeid, error;
994  
995  	if (len > DLM_RESNAME_MAXLEN)
996  		return -EINVAL;
997  
998  	if (from_nodeid == our_nodeid) {
999  		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1000  			  our_nodeid, flags);
1001  		return -EINVAL;
1002  	}
1003  
1004  	hash = jhash(name, len, 0);
1005  	b = hash & (ls->ls_rsbtbl_size - 1);
1006  
1007  	dir_nodeid = dlm_hash2nodeid(ls, hash);
1008  	if (dir_nodeid != our_nodeid) {
1009  		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1010  			  from_nodeid, dir_nodeid, our_nodeid, hash,
1011  			  ls->ls_num_nodes);
1012  		*r_nodeid = -1;
1013  		return -EINVAL;
1014  	}
1015  
1016   retry:
1017  	error = pre_rsb_struct(ls);
1018  	if (error < 0)
1019  		return error;
1020  
1021  	spin_lock(&ls->ls_rsbtbl[b].lock);
1022  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1023  	if (!error) {
1024  		/* because the rsb is active, we need to lock_rsb before
1025  		 * checking/changing re_master_nodeid
1026  		 */
1027  
1028  		hold_rsb(r);
1029  		spin_unlock(&ls->ls_rsbtbl[b].lock);
1030  		lock_rsb(r);
1031  
1032  		__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1033  				    flags, r_nodeid, result);
1034  
1035  		/* the rsb was active */
1036  		unlock_rsb(r);
1037  		put_rsb(r);
1038  
1039  		return 0;
1040  	}
1041  
1042  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1043  	if (error)
1044  		goto not_found;
1045  
1046  	/* because the rsb is inactive (on toss list), it's not refcounted
1047  	 * and lock_rsb is not used, but is protected by the rsbtbl lock
1048  	 */
1049  
1050  	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1051  			    r_nodeid, result);
1052  
1053  	r->res_toss_time = jiffies;
1054  	/* the rsb was inactive (on toss list) */
1055  	spin_unlock(&ls->ls_rsbtbl[b].lock);
1056  
1057  	return 0;
1058  
1059   not_found:
1060  	error = get_rsb_struct(ls, name, len, &r);
1061  	if (error == -EAGAIN) {
1062  		spin_unlock(&ls->ls_rsbtbl[b].lock);
1063  		goto retry;
1064  	}
1065  	if (error)
1066  		goto out_unlock;
1067  
1068  	r->res_hash = hash;
1069  	r->res_bucket = b;
1070  	r->res_dir_nodeid = our_nodeid;
1071  	r->res_master_nodeid = from_nodeid;
1072  	r->res_nodeid = from_nodeid;
1073  	kref_init(&r->res_ref);
1074  	r->res_toss_time = jiffies;
1075  
1076  	error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
1077  	if (error) {
1078  		/* should never happen */
1079  		dlm_free_rsb(r);
1080  		spin_unlock(&ls->ls_rsbtbl[b].lock);
1081  		goto retry;
1082  	}
1083  
1084  	if (result)
1085  		*result = DLM_LU_ADD;
1086  	*r_nodeid = from_nodeid;
1087   out_unlock:
1088  	spin_unlock(&ls->ls_rsbtbl[b].lock);
1089  	return error;
1090  }
1091  
1092  static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1093  {
1094  	struct rb_node *n;
1095  	struct dlm_rsb *r;
1096  	int i;
1097  
1098  	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1099  		spin_lock(&ls->ls_rsbtbl[i].lock);
1100  		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
1101  			r = rb_entry(n, struct dlm_rsb, res_hashnode);
1102  			if (r->res_hash == hash)
1103  				dlm_dump_rsb(r);
1104  		}
1105  		spin_unlock(&ls->ls_rsbtbl[i].lock);
1106  	}
1107  }
1108  
1109  void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1110  {
1111  	struct dlm_rsb *r = NULL;
1112  	uint32_t hash, b;
1113  	int error;
1114  
1115  	hash = jhash(name, len, 0);
1116  	b = hash & (ls->ls_rsbtbl_size - 1);
1117  
1118  	spin_lock(&ls->ls_rsbtbl[b].lock);
1119  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
1120  	if (!error)
1121  		goto out_dump;
1122  
1123  	error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1124  	if (error)
1125  		goto out;
1126   out_dump:
1127  	dlm_dump_rsb(r);
1128   out:
1129  	spin_unlock(&ls->ls_rsbtbl[b].lock);
1130  }
1131  
1132  static void toss_rsb(struct kref *kref)
1133  {
1134  	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1135  	struct dlm_ls *ls = r->res_ls;
1136  
1137  	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1138  	kref_init(&r->res_ref);
1139  	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
1140  	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
1141  	r->res_toss_time = jiffies;
1142  	set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
1143  	if (r->res_lvbptr) {
1144  		dlm_free_lvb(r->res_lvbptr);
1145  		r->res_lvbptr = NULL;
1146  	}
1147  }
1148  
1149  /* See comment for unhold_lkb */
1150  
1151  static void unhold_rsb(struct dlm_rsb *r)
1152  {
1153  	int rv;
1154  	rv = kref_put(&r->res_ref, toss_rsb);
1155  	DLM_ASSERT(!rv, dlm_dump_rsb(r););
1156  }
1157  
1158  static void kill_rsb(struct kref *kref)
1159  {
1160  	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1161  
1162  	/* All work is done after the return from kref_put() so we
1163  	   can release the write_lock before the remove and free. */
1164  
1165  	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1166  	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1167  	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1168  	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1169  	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1170  	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1171  }
1172  
1173  /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1174     The rsb must exist as long as any lkb's for it do. */
1175  
1176  static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1177  {
1178  	hold_rsb(r);
1179  	lkb->lkb_resource = r;
1180  }
1181  
1182  static void detach_lkb(struct dlm_lkb *lkb)
1183  {
1184  	if (lkb->lkb_resource) {
1185  		put_rsb(lkb->lkb_resource);
1186  		lkb->lkb_resource = NULL;
1187  	}
1188  }
1189  
1190  static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1191  		       int start, int end)
1192  {
1193  	struct dlm_lkb *lkb;
1194  	int rv;
1195  
1196  	lkb = dlm_allocate_lkb(ls);
1197  	if (!lkb)
1198  		return -ENOMEM;
1199  
1200  	lkb->lkb_last_bast_mode = -1;
1201  	lkb->lkb_nodeid = -1;
1202  	lkb->lkb_grmode = DLM_LOCK_IV;
1203  	kref_init(&lkb->lkb_ref);
1204  	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1205  	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1206  	INIT_LIST_HEAD(&lkb->lkb_cb_list);
1207  	INIT_LIST_HEAD(&lkb->lkb_callbacks);
1208  	spin_lock_init(&lkb->lkb_cb_lock);
1209  	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
1210  
1211  	idr_preload(GFP_NOFS);
1212  	spin_lock(&ls->ls_lkbidr_spin);
1213  	rv = idr_alloc(&ls->ls_lkbidr, lkb, start, end, GFP_NOWAIT);
1214  	if (rv >= 0)
1215  		lkb->lkb_id = rv;
1216  	spin_unlock(&ls->ls_lkbidr_spin);
1217  	idr_preload_end();
1218  
1219  	if (rv < 0) {
1220  		log_error(ls, "create_lkb idr error %d", rv);
1221  		dlm_free_lkb(lkb);
1222  		return rv;
1223  	}
1224  
1225  	*lkb_ret = lkb;
1226  	return 0;
1227  }
1228  
1229  static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1230  {
1231  	return _create_lkb(ls, lkb_ret, 1, 0);
1232  }
1233  
1234  static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1235  {
1236  	struct dlm_lkb *lkb;
1237  
1238  	spin_lock(&ls->ls_lkbidr_spin);
1239  	lkb = idr_find(&ls->ls_lkbidr, lkid);
1240  	if (lkb)
1241  		kref_get(&lkb->lkb_ref);
1242  	spin_unlock(&ls->ls_lkbidr_spin);
1243  
1244  	*lkb_ret = lkb;
1245  	return lkb ? 0 : -ENOENT;
1246  }
1247  
1248  static void kill_lkb(struct kref *kref)
1249  {
1250  	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1251  
1252  	/* All work is done after the return from kref_put() so we
1253  	   can release the write_lock before the detach_lkb */
1254  
1255  	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1256  }
1257  
1258  /* __put_lkb() is used when an lkb may not have an rsb attached to
1259     it so we need to provide the lockspace explicitly */
1260  
1261  static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1262  {
1263  	uint32_t lkid = lkb->lkb_id;
1264  	int rv;
1265  
1266  	rv = kref_put_lock(&lkb->lkb_ref, kill_lkb,
1267  			   &ls->ls_lkbidr_spin);
1268  	if (rv) {
1269  		idr_remove(&ls->ls_lkbidr, lkid);
1270  		spin_unlock(&ls->ls_lkbidr_spin);
1271  
1272  		detach_lkb(lkb);
1273  
1274  		/* for local/process lkbs, lvbptr points to caller's lksb */
1275  		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1276  			dlm_free_lvb(lkb->lkb_lvbptr);
1277  		dlm_free_lkb(lkb);
1278  	}
1279  
1280  	return rv;
1281  }
1282  
1283  int dlm_put_lkb(struct dlm_lkb *lkb)
1284  {
1285  	struct dlm_ls *ls;
1286  
1287  	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1288  	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1289  
1290  	ls = lkb->lkb_resource->res_ls;
1291  	return __put_lkb(ls, lkb);
1292  }
1293  
1294  /* This is only called to add a reference when the code already holds
1295     a valid reference to the lkb, so there's no need for locking. */
1296  
1297  static inline void hold_lkb(struct dlm_lkb *lkb)
1298  {
1299  	kref_get(&lkb->lkb_ref);
1300  }
1301  
1302  static void unhold_lkb_assert(struct kref *kref)
1303  {
1304  	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1305  
1306  	DLM_ASSERT(false, dlm_print_lkb(lkb););
1307  }
1308  
1309  /* This is called when we need to remove a reference and are certain
1310     it's not the last ref.  e.g. del_lkb is always called between a
1311     find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1312     put_lkb would work fine, but would involve unnecessary locking */
1313  
1314  static inline void unhold_lkb(struct dlm_lkb *lkb)
1315  {
1316  	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1317  }
1318  
1319  static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1320  			    int mode)
1321  {
1322  	struct dlm_lkb *lkb = NULL, *iter;
1323  
1324  	list_for_each_entry(iter, head, lkb_statequeue)
1325  		if (iter->lkb_rqmode < mode) {
1326  			lkb = iter;
1327  			list_add_tail(new, &iter->lkb_statequeue);
1328  			break;
1329  		}
1330  
1331  	if (!lkb)
1332  		list_add_tail(new, head);
1333  }
1334  
1335  /* add/remove lkb to rsb's grant/convert/wait queue */
1336  
1337  static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1338  {
1339  	kref_get(&lkb->lkb_ref);
1340  
1341  	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1342  
1343  	lkb->lkb_timestamp = ktime_get();
1344  
1345  	lkb->lkb_status = status;
1346  
1347  	switch (status) {
1348  	case DLM_LKSTS_WAITING:
1349  		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1350  			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1351  		else
1352  			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1353  		break;
1354  	case DLM_LKSTS_GRANTED:
1355  		/* convention says granted locks kept in order of grmode */
1356  		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1357  				lkb->lkb_grmode);
1358  		break;
1359  	case DLM_LKSTS_CONVERT:
1360  		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1361  			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1362  		else
1363  			list_add_tail(&lkb->lkb_statequeue,
1364  				      &r->res_convertqueue);
1365  		break;
1366  	default:
1367  		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1368  	}
1369  }
1370  
1371  static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1372  {
1373  	lkb->lkb_status = 0;
1374  	list_del(&lkb->lkb_statequeue);
1375  	unhold_lkb(lkb);
1376  }
1377  
1378  static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1379  {
1380  	hold_lkb(lkb);
1381  	del_lkb(r, lkb);
1382  	add_lkb(r, lkb, sts);
1383  	unhold_lkb(lkb);
1384  }
1385  
1386  static int msg_reply_type(int mstype)
1387  {
1388  	switch (mstype) {
1389  	case DLM_MSG_REQUEST:
1390  		return DLM_MSG_REQUEST_REPLY;
1391  	case DLM_MSG_CONVERT:
1392  		return DLM_MSG_CONVERT_REPLY;
1393  	case DLM_MSG_UNLOCK:
1394  		return DLM_MSG_UNLOCK_REPLY;
1395  	case DLM_MSG_CANCEL:
1396  		return DLM_MSG_CANCEL_REPLY;
1397  	case DLM_MSG_LOOKUP:
1398  		return DLM_MSG_LOOKUP_REPLY;
1399  	}
1400  	return -1;
1401  }
1402  
1403  /* add/remove lkb from global waiters list of lkb's waiting for
1404     a reply from a remote node */
1405  
1406  static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1407  {
1408  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1409  	int error = 0;
1410  	int wc;
1411  
1412  	mutex_lock(&ls->ls_waiters_mutex);
1413  
1414  	if (is_overlap_unlock(lkb) ||
1415  	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
1416  		error = -EINVAL;
1417  		goto out;
1418  	}
1419  
1420  	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1421  		switch (mstype) {
1422  		case DLM_MSG_UNLOCK:
1423  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1424  			break;
1425  		case DLM_MSG_CANCEL:
1426  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1427  			break;
1428  		default:
1429  			error = -EBUSY;
1430  			goto out;
1431  		}
1432  		wc = atomic_inc_return(&lkb->lkb_wait_count);
1433  		hold_lkb(lkb);
1434  
1435  		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1436  			  lkb->lkb_id, lkb->lkb_wait_type, mstype, wc,
1437  			  dlm_iflags_val(lkb));
1438  		goto out;
1439  	}
1440  
1441  	wc = atomic_fetch_inc(&lkb->lkb_wait_count);
1442  	DLM_ASSERT(!wc, dlm_print_lkb(lkb); printk("wait_count %d\n", wc););
1443  	lkb->lkb_wait_type = mstype;
1444  	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1445  	hold_lkb(lkb);
1446  	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1447   out:
1448  	if (error)
1449  		log_error(ls, "addwait error %x %d flags %x %d %d %s",
1450  			  lkb->lkb_id, error, dlm_iflags_val(lkb), mstype,
1451  			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
1452  	mutex_unlock(&ls->ls_waiters_mutex);
1453  	return error;
1454  }
1455  
1456  /* We clear the RESEND flag because we might be taking an lkb off the waiters
1457     list as part of process_requestqueue (e.g. a lookup that has an optimized
1458     request reply on the requestqueue) between dlm_recover_waiters_pre() which
1459     set RESEND and dlm_recover_waiters_post() */
1460  
1461  static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1462  				const struct dlm_message *ms)
1463  {
1464  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1465  	int overlap_done = 0;
1466  
1467  	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1468  	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1469  		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1470  		overlap_done = 1;
1471  		goto out_del;
1472  	}
1473  
1474  	if (mstype == DLM_MSG_CANCEL_REPLY &&
1475  	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1476  		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1477  		overlap_done = 1;
1478  		goto out_del;
1479  	}
1480  
1481  	/* Cancel state was preemptively cleared by a successful convert,
1482  	   see next comment, nothing to do. */
1483  
1484  	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1485  	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1486  		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1487  			  lkb->lkb_id, lkb->lkb_wait_type);
1488  		return -1;
1489  	}
1490  
1491  	/* Remove for the convert reply, and premptively remove for the
1492  	   cancel reply.  A convert has been granted while there's still
1493  	   an outstanding cancel on it (the cancel is moot and the result
1494  	   in the cancel reply should be 0).  We preempt the cancel reply
1495  	   because the app gets the convert result and then can follow up
1496  	   with another op, like convert.  This subsequent op would see the
1497  	   lingering state of the cancel and fail with -EBUSY. */
1498  
1499  	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1500  	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1501  	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1502  		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1503  			  lkb->lkb_id);
1504  		lkb->lkb_wait_type = 0;
1505  		atomic_dec(&lkb->lkb_wait_count);
1506  		unhold_lkb(lkb);
1507  		goto out_del;
1508  	}
1509  
1510  	/* N.B. type of reply may not always correspond to type of original
1511  	   msg due to lookup->request optimization, verify others? */
1512  
1513  	if (lkb->lkb_wait_type) {
1514  		lkb->lkb_wait_type = 0;
1515  		goto out_del;
1516  	}
1517  
1518  	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1519  		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1520  		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1521  	return -1;
1522  
1523   out_del:
1524  	/* the force-unlock/cancel has completed and we haven't recvd a reply
1525  	   to the op that was in progress prior to the unlock/cancel; we
1526  	   give up on any reply to the earlier op.  FIXME: not sure when/how
1527  	   this would happen */
1528  
1529  	if (overlap_done && lkb->lkb_wait_type) {
1530  		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1531  			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1532  		atomic_dec(&lkb->lkb_wait_count);
1533  		unhold_lkb(lkb);
1534  		lkb->lkb_wait_type = 0;
1535  	}
1536  
1537  	DLM_ASSERT(atomic_read(&lkb->lkb_wait_count), dlm_print_lkb(lkb););
1538  
1539  	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1540  	if (atomic_dec_and_test(&lkb->lkb_wait_count))
1541  		list_del_init(&lkb->lkb_wait_reply);
1542  	unhold_lkb(lkb);
1543  	return 0;
1544  }
1545  
1546  static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1547  {
1548  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1549  	int error;
1550  
1551  	mutex_lock(&ls->ls_waiters_mutex);
1552  	error = _remove_from_waiters(lkb, mstype, NULL);
1553  	mutex_unlock(&ls->ls_waiters_mutex);
1554  	return error;
1555  }
1556  
1557  /* Handles situations where we might be processing a "fake" or "local" reply in
1558     which we can't try to take waiters_mutex again. */
1559  
1560  static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1561  				  const struct dlm_message *ms, bool local)
1562  {
1563  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1564  	int error;
1565  
1566  	if (!local)
1567  		mutex_lock(&ls->ls_waiters_mutex);
1568  	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1569  	if (!local)
1570  		mutex_unlock(&ls->ls_waiters_mutex);
1571  	return error;
1572  }
1573  
1574  static void shrink_bucket(struct dlm_ls *ls, int b)
1575  {
1576  	struct rb_node *n, *next;
1577  	struct dlm_rsb *r;
1578  	char *name;
1579  	int our_nodeid = dlm_our_nodeid();
1580  	int remote_count = 0;
1581  	int need_shrink = 0;
1582  	int i, len, rv;
1583  
1584  	memset(&ls->ls_remove_lens, 0, sizeof(int) * DLM_REMOVE_NAMES_MAX);
1585  
1586  	spin_lock(&ls->ls_rsbtbl[b].lock);
1587  
1588  	if (!test_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags)) {
1589  		spin_unlock(&ls->ls_rsbtbl[b].lock);
1590  		return;
1591  	}
1592  
1593  	for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
1594  		next = rb_next(n);
1595  		r = rb_entry(n, struct dlm_rsb, res_hashnode);
1596  
1597  		/* If we're the directory record for this rsb, and
1598  		   we're not the master of it, then we need to wait
1599  		   for the master node to send us a dir remove for
1600  		   before removing the dir record. */
1601  
1602  		if (!dlm_no_directory(ls) &&
1603  		    (r->res_master_nodeid != our_nodeid) &&
1604  		    (dlm_dir_nodeid(r) == our_nodeid)) {
1605  			continue;
1606  		}
1607  
1608  		need_shrink = 1;
1609  
1610  		if (!time_after_eq(jiffies, r->res_toss_time +
1611  				   dlm_config.ci_toss_secs * HZ)) {
1612  			continue;
1613  		}
1614  
1615  		if (!dlm_no_directory(ls) &&
1616  		    (r->res_master_nodeid == our_nodeid) &&
1617  		    (dlm_dir_nodeid(r) != our_nodeid)) {
1618  
1619  			/* We're the master of this rsb but we're not
1620  			   the directory record, so we need to tell the
1621  			   dir node to remove the dir record. */
1622  
1623  			ls->ls_remove_lens[remote_count] = r->res_length;
1624  			memcpy(ls->ls_remove_names[remote_count], r->res_name,
1625  			       DLM_RESNAME_MAXLEN);
1626  			remote_count++;
1627  
1628  			if (remote_count >= DLM_REMOVE_NAMES_MAX)
1629  				break;
1630  			continue;
1631  		}
1632  
1633  		if (!kref_put(&r->res_ref, kill_rsb)) {
1634  			log_error(ls, "tossed rsb in use %s", r->res_name);
1635  			continue;
1636  		}
1637  
1638  		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1639  		dlm_free_rsb(r);
1640  	}
1641  
1642  	if (need_shrink)
1643  		set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1644  	else
1645  		clear_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[b].flags);
1646  	spin_unlock(&ls->ls_rsbtbl[b].lock);
1647  
1648  	/*
1649  	 * While searching for rsb's to free, we found some that require
1650  	 * remote removal.  We leave them in place and find them again here
1651  	 * so there is a very small gap between removing them from the toss
1652  	 * list and sending the removal.  Keeping this gap small is
1653  	 * important to keep us (the master node) from being out of sync
1654  	 * with the remote dir node for very long.
1655  	 */
1656  
1657  	for (i = 0; i < remote_count; i++) {
1658  		name = ls->ls_remove_names[i];
1659  		len = ls->ls_remove_lens[i];
1660  
1661  		spin_lock(&ls->ls_rsbtbl[b].lock);
1662  		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
1663  		if (rv) {
1664  			spin_unlock(&ls->ls_rsbtbl[b].lock);
1665  			log_debug(ls, "remove_name not toss %s", name);
1666  			continue;
1667  		}
1668  
1669  		if (r->res_master_nodeid != our_nodeid) {
1670  			spin_unlock(&ls->ls_rsbtbl[b].lock);
1671  			log_debug(ls, "remove_name master %d dir %d our %d %s",
1672  				  r->res_master_nodeid, r->res_dir_nodeid,
1673  				  our_nodeid, name);
1674  			continue;
1675  		}
1676  
1677  		if (r->res_dir_nodeid == our_nodeid) {
1678  			/* should never happen */
1679  			spin_unlock(&ls->ls_rsbtbl[b].lock);
1680  			log_error(ls, "remove_name dir %d master %d our %d %s",
1681  				  r->res_dir_nodeid, r->res_master_nodeid,
1682  				  our_nodeid, name);
1683  			continue;
1684  		}
1685  
1686  		if (!time_after_eq(jiffies, r->res_toss_time +
1687  				   dlm_config.ci_toss_secs * HZ)) {
1688  			spin_unlock(&ls->ls_rsbtbl[b].lock);
1689  			log_debug(ls, "remove_name toss_time %lu now %lu %s",
1690  				  r->res_toss_time, jiffies, name);
1691  			continue;
1692  		}
1693  
1694  		if (!kref_put(&r->res_ref, kill_rsb)) {
1695  			spin_unlock(&ls->ls_rsbtbl[b].lock);
1696  			log_error(ls, "remove_name in use %s", name);
1697  			continue;
1698  		}
1699  
1700  		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
1701  		send_remove(r);
1702  		spin_unlock(&ls->ls_rsbtbl[b].lock);
1703  
1704  		dlm_free_rsb(r);
1705  	}
1706  }
1707  
1708  void dlm_scan_rsbs(struct dlm_ls *ls)
1709  {
1710  	int i;
1711  
1712  	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1713  		shrink_bucket(ls, i);
1714  		if (dlm_locking_stopped(ls))
1715  			break;
1716  		cond_resched();
1717  	}
1718  }
1719  
1720  /* lkb is master or local copy */
1721  
1722  static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1723  {
1724  	int b, len = r->res_ls->ls_lvblen;
1725  
1726  	/* b=1 lvb returned to caller
1727  	   b=0 lvb written to rsb or invalidated
1728  	   b=-1 do nothing */
1729  
1730  	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1731  
1732  	if (b == 1) {
1733  		if (!lkb->lkb_lvbptr)
1734  			return;
1735  
1736  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1737  			return;
1738  
1739  		if (!r->res_lvbptr)
1740  			return;
1741  
1742  		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1743  		lkb->lkb_lvbseq = r->res_lvbseq;
1744  
1745  	} else if (b == 0) {
1746  		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1747  			rsb_set_flag(r, RSB_VALNOTVALID);
1748  			return;
1749  		}
1750  
1751  		if (!lkb->lkb_lvbptr)
1752  			return;
1753  
1754  		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1755  			return;
1756  
1757  		if (!r->res_lvbptr)
1758  			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1759  
1760  		if (!r->res_lvbptr)
1761  			return;
1762  
1763  		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1764  		r->res_lvbseq++;
1765  		lkb->lkb_lvbseq = r->res_lvbseq;
1766  		rsb_clear_flag(r, RSB_VALNOTVALID);
1767  	}
1768  
1769  	if (rsb_flag(r, RSB_VALNOTVALID))
1770  		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1771  }
1772  
1773  static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1774  {
1775  	if (lkb->lkb_grmode < DLM_LOCK_PW)
1776  		return;
1777  
1778  	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1779  		rsb_set_flag(r, RSB_VALNOTVALID);
1780  		return;
1781  	}
1782  
1783  	if (!lkb->lkb_lvbptr)
1784  		return;
1785  
1786  	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1787  		return;
1788  
1789  	if (!r->res_lvbptr)
1790  		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1791  
1792  	if (!r->res_lvbptr)
1793  		return;
1794  
1795  	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1796  	r->res_lvbseq++;
1797  	rsb_clear_flag(r, RSB_VALNOTVALID);
1798  }
1799  
1800  /* lkb is process copy (pc) */
1801  
1802  static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1803  			    const struct dlm_message *ms)
1804  {
1805  	int b;
1806  
1807  	if (!lkb->lkb_lvbptr)
1808  		return;
1809  
1810  	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1811  		return;
1812  
1813  	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1814  	if (b == 1) {
1815  		int len = receive_extralen(ms);
1816  		if (len > r->res_ls->ls_lvblen)
1817  			len = r->res_ls->ls_lvblen;
1818  		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1819  		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1820  	}
1821  }
1822  
1823  /* Manipulate lkb's on rsb's convert/granted/waiting queues
1824     remove_lock -- used for unlock, removes lkb from granted
1825     revert_lock -- used for cancel, moves lkb from convert to granted
1826     grant_lock  -- used for request and convert, adds lkb to granted or
1827                    moves lkb from convert or waiting to granted
1828  
1829     Each of these is used for master or local copy lkb's.  There is
1830     also a _pc() variation used to make the corresponding change on
1831     a process copy (pc) lkb. */
1832  
1833  static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1834  {
1835  	del_lkb(r, lkb);
1836  	lkb->lkb_grmode = DLM_LOCK_IV;
1837  	/* this unhold undoes the original ref from create_lkb()
1838  	   so this leads to the lkb being freed */
1839  	unhold_lkb(lkb);
1840  }
1841  
1842  static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1843  {
1844  	set_lvb_unlock(r, lkb);
1845  	_remove_lock(r, lkb);
1846  }
1847  
1848  static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1849  {
1850  	_remove_lock(r, lkb);
1851  }
1852  
1853  /* returns: 0 did nothing
1854  	    1 moved lock to granted
1855  	   -1 removed lock */
1856  
1857  static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1858  {
1859  	int rv = 0;
1860  
1861  	lkb->lkb_rqmode = DLM_LOCK_IV;
1862  
1863  	switch (lkb->lkb_status) {
1864  	case DLM_LKSTS_GRANTED:
1865  		break;
1866  	case DLM_LKSTS_CONVERT:
1867  		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1868  		rv = 1;
1869  		break;
1870  	case DLM_LKSTS_WAITING:
1871  		del_lkb(r, lkb);
1872  		lkb->lkb_grmode = DLM_LOCK_IV;
1873  		/* this unhold undoes the original ref from create_lkb()
1874  		   so this leads to the lkb being freed */
1875  		unhold_lkb(lkb);
1876  		rv = -1;
1877  		break;
1878  	default:
1879  		log_print("invalid status for revert %d", lkb->lkb_status);
1880  	}
1881  	return rv;
1882  }
1883  
1884  static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1885  {
1886  	return revert_lock(r, lkb);
1887  }
1888  
1889  static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1890  {
1891  	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1892  		lkb->lkb_grmode = lkb->lkb_rqmode;
1893  		if (lkb->lkb_status)
1894  			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1895  		else
1896  			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1897  	}
1898  
1899  	lkb->lkb_rqmode = DLM_LOCK_IV;
1900  	lkb->lkb_highbast = 0;
1901  }
1902  
1903  static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1904  {
1905  	set_lvb_lock(r, lkb);
1906  	_grant_lock(r, lkb);
1907  }
1908  
1909  static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1910  			  const struct dlm_message *ms)
1911  {
1912  	set_lvb_lock_pc(r, lkb, ms);
1913  	_grant_lock(r, lkb);
1914  }
1915  
1916  /* called by grant_pending_locks() which means an async grant message must
1917     be sent to the requesting node in addition to granting the lock if the
1918     lkb belongs to a remote node. */
1919  
1920  static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1921  {
1922  	grant_lock(r, lkb);
1923  	if (is_master_copy(lkb))
1924  		send_grant(r, lkb);
1925  	else
1926  		queue_cast(r, lkb, 0);
1927  }
1928  
1929  /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1930     change the granted/requested modes.  We're munging things accordingly in
1931     the process copy.
1932     CONVDEADLK: our grmode may have been forced down to NL to resolve a
1933     conversion deadlock
1934     ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1935     compatible with other granted locks */
1936  
1937  static void munge_demoted(struct dlm_lkb *lkb)
1938  {
1939  	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1940  		log_print("munge_demoted %x invalid modes gr %d rq %d",
1941  			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1942  		return;
1943  	}
1944  
1945  	lkb->lkb_grmode = DLM_LOCK_NL;
1946  }
1947  
1948  static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
1949  {
1950  	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
1951  	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
1952  		log_print("munge_altmode %x invalid reply type %d",
1953  			  lkb->lkb_id, le32_to_cpu(ms->m_type));
1954  		return;
1955  	}
1956  
1957  	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1958  		lkb->lkb_rqmode = DLM_LOCK_PR;
1959  	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1960  		lkb->lkb_rqmode = DLM_LOCK_CW;
1961  	else {
1962  		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1963  		dlm_print_lkb(lkb);
1964  	}
1965  }
1966  
1967  static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1968  {
1969  	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1970  					   lkb_statequeue);
1971  	if (lkb->lkb_id == first->lkb_id)
1972  		return 1;
1973  
1974  	return 0;
1975  }
1976  
1977  /* Check if the given lkb conflicts with another lkb on the queue. */
1978  
1979  static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1980  {
1981  	struct dlm_lkb *this;
1982  
1983  	list_for_each_entry(this, head, lkb_statequeue) {
1984  		if (this == lkb)
1985  			continue;
1986  		if (!modes_compat(this, lkb))
1987  			return 1;
1988  	}
1989  	return 0;
1990  }
1991  
1992  /*
1993   * "A conversion deadlock arises with a pair of lock requests in the converting
1994   * queue for one resource.  The granted mode of each lock blocks the requested
1995   * mode of the other lock."
1996   *
1997   * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1998   * convert queue from being granted, then deadlk/demote lkb.
1999   *
2000   * Example:
2001   * Granted Queue: empty
2002   * Convert Queue: NL->EX (first lock)
2003   *                PR->EX (second lock)
2004   *
2005   * The first lock can't be granted because of the granted mode of the second
2006   * lock and the second lock can't be granted because it's not first in the
2007   * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2008   * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2009   * flag set and return DEMOTED in the lksb flags.
2010   *
2011   * Originally, this function detected conv-deadlk in a more limited scope:
2012   * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2013   * - if lkb1 was the first entry in the queue (not just earlier), and was
2014   *   blocked by the granted mode of lkb2, and there was nothing on the
2015   *   granted queue preventing lkb1 from being granted immediately, i.e.
2016   *   lkb2 was the only thing preventing lkb1 from being granted.
2017   *
2018   * That second condition meant we'd only say there was conv-deadlk if
2019   * resolving it (by demotion) would lead to the first lock on the convert
2020   * queue being granted right away.  It allowed conversion deadlocks to exist
2021   * between locks on the convert queue while they couldn't be granted anyway.
2022   *
2023   * Now, we detect and take action on conversion deadlocks immediately when
2024   * they're created, even if they may not be immediately consequential.  If
2025   * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2026   * mode that would prevent lkb1's conversion from being granted, we do a
2027   * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2028   * I think this means that the lkb_is_ahead condition below should always
2029   * be zero, i.e. there will never be conv-deadlk between two locks that are
2030   * both already on the convert queue.
2031   */
2032  
2033  static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2034  {
2035  	struct dlm_lkb *lkb1;
2036  	int lkb_is_ahead = 0;
2037  
2038  	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2039  		if (lkb1 == lkb2) {
2040  			lkb_is_ahead = 1;
2041  			continue;
2042  		}
2043  
2044  		if (!lkb_is_ahead) {
2045  			if (!modes_compat(lkb2, lkb1))
2046  				return 1;
2047  		} else {
2048  			if (!modes_compat(lkb2, lkb1) &&
2049  			    !modes_compat(lkb1, lkb2))
2050  				return 1;
2051  		}
2052  	}
2053  	return 0;
2054  }
2055  
2056  /*
2057   * Return 1 if the lock can be granted, 0 otherwise.
2058   * Also detect and resolve conversion deadlocks.
2059   *
2060   * lkb is the lock to be granted
2061   *
2062   * now is 1 if the function is being called in the context of the
2063   * immediate request, it is 0 if called later, after the lock has been
2064   * queued.
2065   *
2066   * recover is 1 if dlm_recover_grant() is trying to grant conversions
2067   * after recovery.
2068   *
2069   * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2070   */
2071  
2072  static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2073  			   int recover)
2074  {
2075  	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2076  
2077  	/*
2078  	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2079  	 * a new request for a NL mode lock being blocked.
2080  	 *
2081  	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2082  	 * request, then it would be granted.  In essence, the use of this flag
2083  	 * tells the Lock Manager to expedite theis request by not considering
2084  	 * what may be in the CONVERTING or WAITING queues...  As of this
2085  	 * writing, the EXPEDITE flag can be used only with new requests for NL
2086  	 * mode locks.  This flag is not valid for conversion requests.
2087  	 *
2088  	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2089  	 * conversion or used with a non-NL requested mode.  We also know an
2090  	 * EXPEDITE request is always granted immediately, so now must always
2091  	 * be 1.  The full condition to grant an expedite request: (now &&
2092  	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2093  	 * therefore be shortened to just checking the flag.
2094  	 */
2095  
2096  	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2097  		return 1;
2098  
2099  	/*
2100  	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2101  	 * added to the remaining conditions.
2102  	 */
2103  
2104  	if (queue_conflict(&r->res_grantqueue, lkb))
2105  		return 0;
2106  
2107  	/*
2108  	 * 6-3: By default, a conversion request is immediately granted if the
2109  	 * requested mode is compatible with the modes of all other granted
2110  	 * locks
2111  	 */
2112  
2113  	if (queue_conflict(&r->res_convertqueue, lkb))
2114  		return 0;
2115  
2116  	/*
2117  	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2118  	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2119  	 * The lkb's may have been rebuilt on the queues in a different
2120  	 * order than they were in on the previous master.  So, granting
2121  	 * queued conversions in order after recovery doesn't make sense
2122  	 * since the order hasn't been preserved anyway.  The new order
2123  	 * could also have created a new "in place" conversion deadlock.
2124  	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2125  	 * After recovery, there would be no granted locks, and possibly
2126  	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2127  	 * recovery, grant conversions without considering order.
2128  	 */
2129  
2130  	if (conv && recover)
2131  		return 1;
2132  
2133  	/*
2134  	 * 6-5: But the default algorithm for deciding whether to grant or
2135  	 * queue conversion requests does not by itself guarantee that such
2136  	 * requests are serviced on a "first come first serve" basis.  This, in
2137  	 * turn, can lead to a phenomenon known as "indefinate postponement".
2138  	 *
2139  	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2140  	 * the system service employed to request a lock conversion.  This flag
2141  	 * forces certain conversion requests to be queued, even if they are
2142  	 * compatible with the granted modes of other locks on the same
2143  	 * resource.  Thus, the use of this flag results in conversion requests
2144  	 * being ordered on a "first come first servce" basis.
2145  	 *
2146  	 * DCT: This condition is all about new conversions being able to occur
2147  	 * "in place" while the lock remains on the granted queue (assuming
2148  	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2149  	 * doesn't _have_ to go onto the convert queue where it's processed in
2150  	 * order.  The "now" variable is necessary to distinguish converts
2151  	 * being received and processed for the first time now, because once a
2152  	 * convert is moved to the conversion queue the condition below applies
2153  	 * requiring fifo granting.
2154  	 */
2155  
2156  	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2157  		return 1;
2158  
2159  	/*
2160  	 * Even if the convert is compat with all granted locks,
2161  	 * QUECVT forces it behind other locks on the convert queue.
2162  	 */
2163  
2164  	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2165  		if (list_empty(&r->res_convertqueue))
2166  			return 1;
2167  		else
2168  			return 0;
2169  	}
2170  
2171  	/*
2172  	 * The NOORDER flag is set to avoid the standard vms rules on grant
2173  	 * order.
2174  	 */
2175  
2176  	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2177  		return 1;
2178  
2179  	/*
2180  	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2181  	 * granted until all other conversion requests ahead of it are granted
2182  	 * and/or canceled.
2183  	 */
2184  
2185  	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2186  		return 1;
2187  
2188  	/*
2189  	 * 6-4: By default, a new request is immediately granted only if all
2190  	 * three of the following conditions are satisfied when the request is
2191  	 * issued:
2192  	 * - The queue of ungranted conversion requests for the resource is
2193  	 *   empty.
2194  	 * - The queue of ungranted new requests for the resource is empty.
2195  	 * - The mode of the new request is compatible with the most
2196  	 *   restrictive mode of all granted locks on the resource.
2197  	 */
2198  
2199  	if (now && !conv && list_empty(&r->res_convertqueue) &&
2200  	    list_empty(&r->res_waitqueue))
2201  		return 1;
2202  
2203  	/*
2204  	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2205  	 * it cannot be granted until the queue of ungranted conversion
2206  	 * requests is empty, all ungranted new requests ahead of it are
2207  	 * granted and/or canceled, and it is compatible with the granted mode
2208  	 * of the most restrictive lock granted on the resource.
2209  	 */
2210  
2211  	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2212  	    first_in_list(lkb, &r->res_waitqueue))
2213  		return 1;
2214  
2215  	return 0;
2216  }
2217  
2218  static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2219  			  int recover, int *err)
2220  {
2221  	int rv;
2222  	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2223  	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2224  
2225  	if (err)
2226  		*err = 0;
2227  
2228  	rv = _can_be_granted(r, lkb, now, recover);
2229  	if (rv)
2230  		goto out;
2231  
2232  	/*
2233  	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2234  	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2235  	 * cancels one of the locks.
2236  	 */
2237  
2238  	if (is_convert && can_be_queued(lkb) &&
2239  	    conversion_deadlock_detect(r, lkb)) {
2240  		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2241  			lkb->lkb_grmode = DLM_LOCK_NL;
2242  			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2243  		} else if (err) {
2244  			*err = -EDEADLK;
2245  		} else {
2246  			log_print("can_be_granted deadlock %x now %d",
2247  				  lkb->lkb_id, now);
2248  			dlm_dump_rsb(r);
2249  		}
2250  		goto out;
2251  	}
2252  
2253  	/*
2254  	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2255  	 * to grant a request in a mode other than the normal rqmode.  It's a
2256  	 * simple way to provide a big optimization to applications that can
2257  	 * use them.
2258  	 */
2259  
2260  	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2261  		alt = DLM_LOCK_PR;
2262  	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2263  		alt = DLM_LOCK_CW;
2264  
2265  	if (alt) {
2266  		lkb->lkb_rqmode = alt;
2267  		rv = _can_be_granted(r, lkb, now, 0);
2268  		if (rv)
2269  			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2270  		else
2271  			lkb->lkb_rqmode = rqmode;
2272  	}
2273   out:
2274  	return rv;
2275  }
2276  
2277  /* Returns the highest requested mode of all blocked conversions; sets
2278     cw if there's a blocked conversion to DLM_LOCK_CW. */
2279  
2280  static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2281  				 unsigned int *count)
2282  {
2283  	struct dlm_lkb *lkb, *s;
2284  	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2285  	int hi, demoted, quit, grant_restart, demote_restart;
2286  	int deadlk;
2287  
2288  	quit = 0;
2289   restart:
2290  	grant_restart = 0;
2291  	demote_restart = 0;
2292  	hi = DLM_LOCK_IV;
2293  
2294  	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2295  		demoted = is_demoted(lkb);
2296  		deadlk = 0;
2297  
2298  		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2299  			grant_lock_pending(r, lkb);
2300  			grant_restart = 1;
2301  			if (count)
2302  				(*count)++;
2303  			continue;
2304  		}
2305  
2306  		if (!demoted && is_demoted(lkb)) {
2307  			log_print("WARN: pending demoted %x node %d %s",
2308  				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2309  			demote_restart = 1;
2310  			continue;
2311  		}
2312  
2313  		if (deadlk) {
2314  			/*
2315  			 * If DLM_LKB_NODLKWT flag is set and conversion
2316  			 * deadlock is detected, we request blocking AST and
2317  			 * down (or cancel) conversion.
2318  			 */
2319  			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2320  				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2321  					queue_bast(r, lkb, lkb->lkb_rqmode);
2322  					lkb->lkb_highbast = lkb->lkb_rqmode;
2323  				}
2324  			} else {
2325  				log_print("WARN: pending deadlock %x node %d %s",
2326  					  lkb->lkb_id, lkb->lkb_nodeid,
2327  					  r->res_name);
2328  				dlm_dump_rsb(r);
2329  			}
2330  			continue;
2331  		}
2332  
2333  		hi = max_t(int, lkb->lkb_rqmode, hi);
2334  
2335  		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2336  			*cw = 1;
2337  	}
2338  
2339  	if (grant_restart)
2340  		goto restart;
2341  	if (demote_restart && !quit) {
2342  		quit = 1;
2343  		goto restart;
2344  	}
2345  
2346  	return max_t(int, high, hi);
2347  }
2348  
2349  static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2350  			      unsigned int *count)
2351  {
2352  	struct dlm_lkb *lkb, *s;
2353  
2354  	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2355  		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2356  			grant_lock_pending(r, lkb);
2357  			if (count)
2358  				(*count)++;
2359  		} else {
2360  			high = max_t(int, lkb->lkb_rqmode, high);
2361  			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2362  				*cw = 1;
2363  		}
2364  	}
2365  
2366  	return high;
2367  }
2368  
2369  /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2370     on either the convert or waiting queue.
2371     high is the largest rqmode of all locks blocked on the convert or
2372     waiting queue. */
2373  
2374  static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2375  {
2376  	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2377  		if (gr->lkb_highbast < DLM_LOCK_EX)
2378  			return 1;
2379  		return 0;
2380  	}
2381  
2382  	if (gr->lkb_highbast < high &&
2383  	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2384  		return 1;
2385  	return 0;
2386  }
2387  
2388  static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2389  {
2390  	struct dlm_lkb *lkb, *s;
2391  	int high = DLM_LOCK_IV;
2392  	int cw = 0;
2393  
2394  	if (!is_master(r)) {
2395  		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2396  		dlm_dump_rsb(r);
2397  		return;
2398  	}
2399  
2400  	high = grant_pending_convert(r, high, &cw, count);
2401  	high = grant_pending_wait(r, high, &cw, count);
2402  
2403  	if (high == DLM_LOCK_IV)
2404  		return;
2405  
2406  	/*
2407  	 * If there are locks left on the wait/convert queue then send blocking
2408  	 * ASTs to granted locks based on the largest requested mode (high)
2409  	 * found above.
2410  	 */
2411  
2412  	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2413  		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2414  			if (cw && high == DLM_LOCK_PR &&
2415  			    lkb->lkb_grmode == DLM_LOCK_PR)
2416  				queue_bast(r, lkb, DLM_LOCK_CW);
2417  			else
2418  				queue_bast(r, lkb, high);
2419  			lkb->lkb_highbast = high;
2420  		}
2421  	}
2422  }
2423  
2424  static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2425  {
2426  	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2427  	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2428  		if (gr->lkb_highbast < DLM_LOCK_EX)
2429  			return 1;
2430  		return 0;
2431  	}
2432  
2433  	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2434  		return 1;
2435  	return 0;
2436  }
2437  
2438  static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2439  			    struct dlm_lkb *lkb)
2440  {
2441  	struct dlm_lkb *gr;
2442  
2443  	list_for_each_entry(gr, head, lkb_statequeue) {
2444  		/* skip self when sending basts to convertqueue */
2445  		if (gr == lkb)
2446  			continue;
2447  		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2448  			queue_bast(r, gr, lkb->lkb_rqmode);
2449  			gr->lkb_highbast = lkb->lkb_rqmode;
2450  		}
2451  	}
2452  }
2453  
2454  static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2455  {
2456  	send_bast_queue(r, &r->res_grantqueue, lkb);
2457  }
2458  
2459  static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2460  {
2461  	send_bast_queue(r, &r->res_grantqueue, lkb);
2462  	send_bast_queue(r, &r->res_convertqueue, lkb);
2463  }
2464  
2465  /* set_master(r, lkb) -- set the master nodeid of a resource
2466  
2467     The purpose of this function is to set the nodeid field in the given
2468     lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2469     known, it can just be copied to the lkb and the function will return
2470     0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2471     before it can be copied to the lkb.
2472  
2473     When the rsb nodeid is being looked up remotely, the initial lkb
2474     causing the lookup is kept on the ls_waiters list waiting for the
2475     lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2476     on the rsb's res_lookup list until the master is verified.
2477  
2478     Return values:
2479     0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2480     1: the rsb master is not available and the lkb has been placed on
2481        a wait queue
2482  */
2483  
2484  static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2485  {
2486  	int our_nodeid = dlm_our_nodeid();
2487  
2488  	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2489  		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2490  		r->res_first_lkid = lkb->lkb_id;
2491  		lkb->lkb_nodeid = r->res_nodeid;
2492  		return 0;
2493  	}
2494  
2495  	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2496  		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2497  		return 1;
2498  	}
2499  
2500  	if (r->res_master_nodeid == our_nodeid) {
2501  		lkb->lkb_nodeid = 0;
2502  		return 0;
2503  	}
2504  
2505  	if (r->res_master_nodeid) {
2506  		lkb->lkb_nodeid = r->res_master_nodeid;
2507  		return 0;
2508  	}
2509  
2510  	if (dlm_dir_nodeid(r) == our_nodeid) {
2511  		/* This is a somewhat unusual case; find_rsb will usually
2512  		   have set res_master_nodeid when dir nodeid is local, but
2513  		   there are cases where we become the dir node after we've
2514  		   past find_rsb and go through _request_lock again.
2515  		   confirm_master() or process_lookup_list() needs to be
2516  		   called after this. */
2517  		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2518  			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2519  			  r->res_name);
2520  		r->res_master_nodeid = our_nodeid;
2521  		r->res_nodeid = 0;
2522  		lkb->lkb_nodeid = 0;
2523  		return 0;
2524  	}
2525  
2526  	r->res_first_lkid = lkb->lkb_id;
2527  	send_lookup(r, lkb);
2528  	return 1;
2529  }
2530  
2531  static void process_lookup_list(struct dlm_rsb *r)
2532  {
2533  	struct dlm_lkb *lkb, *safe;
2534  
2535  	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2536  		list_del_init(&lkb->lkb_rsb_lookup);
2537  		_request_lock(r, lkb);
2538  		schedule();
2539  	}
2540  }
2541  
2542  /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2543  
2544  static void confirm_master(struct dlm_rsb *r, int error)
2545  {
2546  	struct dlm_lkb *lkb;
2547  
2548  	if (!r->res_first_lkid)
2549  		return;
2550  
2551  	switch (error) {
2552  	case 0:
2553  	case -EINPROGRESS:
2554  		r->res_first_lkid = 0;
2555  		process_lookup_list(r);
2556  		break;
2557  
2558  	case -EAGAIN:
2559  	case -EBADR:
2560  	case -ENOTBLK:
2561  		/* the remote request failed and won't be retried (it was
2562  		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2563  		   lkb the first_lkid */
2564  
2565  		r->res_first_lkid = 0;
2566  
2567  		if (!list_empty(&r->res_lookup)) {
2568  			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2569  					 lkb_rsb_lookup);
2570  			list_del_init(&lkb->lkb_rsb_lookup);
2571  			r->res_first_lkid = lkb->lkb_id;
2572  			_request_lock(r, lkb);
2573  		}
2574  		break;
2575  
2576  	default:
2577  		log_error(r->res_ls, "confirm_master unknown error %d", error);
2578  	}
2579  }
2580  
2581  static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2582  			 int namelen, void (*ast)(void *astparam),
2583  			 void *astparam,
2584  			 void (*bast)(void *astparam, int mode),
2585  			 struct dlm_args *args)
2586  {
2587  	int rv = -EINVAL;
2588  
2589  	/* check for invalid arg usage */
2590  
2591  	if (mode < 0 || mode > DLM_LOCK_EX)
2592  		goto out;
2593  
2594  	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2595  		goto out;
2596  
2597  	if (flags & DLM_LKF_CANCEL)
2598  		goto out;
2599  
2600  	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2601  		goto out;
2602  
2603  	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2604  		goto out;
2605  
2606  	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2607  		goto out;
2608  
2609  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2610  		goto out;
2611  
2612  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2613  		goto out;
2614  
2615  	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2616  		goto out;
2617  
2618  	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2619  		goto out;
2620  
2621  	if (!ast || !lksb)
2622  		goto out;
2623  
2624  	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2625  		goto out;
2626  
2627  	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2628  		goto out;
2629  
2630  	/* these args will be copied to the lkb in validate_lock_args,
2631  	   it cannot be done now because when converting locks, fields in
2632  	   an active lkb cannot be modified before locking the rsb */
2633  
2634  	args->flags = flags;
2635  	args->astfn = ast;
2636  	args->astparam = astparam;
2637  	args->bastfn = bast;
2638  	args->mode = mode;
2639  	args->lksb = lksb;
2640  	rv = 0;
2641   out:
2642  	return rv;
2643  }
2644  
2645  static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2646  {
2647  	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2648   		      DLM_LKF_FORCEUNLOCK))
2649  		return -EINVAL;
2650  
2651  	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2652  		return -EINVAL;
2653  
2654  	args->flags = flags;
2655  	args->astparam = astarg;
2656  	return 0;
2657  }
2658  
2659  static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2660  			      struct dlm_args *args)
2661  {
2662  	int rv = -EBUSY;
2663  
2664  	if (args->flags & DLM_LKF_CONVERT) {
2665  		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2666  			goto out;
2667  
2668  		/* lock not allowed if there's any op in progress */
2669  		if (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count))
2670  			goto out;
2671  
2672  		if (is_overlap(lkb))
2673  			goto out;
2674  
2675  		rv = -EINVAL;
2676  		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2677  			goto out;
2678  
2679  		if (args->flags & DLM_LKF_QUECVT &&
2680  		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2681  			goto out;
2682  	}
2683  
2684  	lkb->lkb_exflags = args->flags;
2685  	dlm_set_sbflags_val(lkb, 0);
2686  	lkb->lkb_astfn = args->astfn;
2687  	lkb->lkb_astparam = args->astparam;
2688  	lkb->lkb_bastfn = args->bastfn;
2689  	lkb->lkb_rqmode = args->mode;
2690  	lkb->lkb_lksb = args->lksb;
2691  	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2692  	lkb->lkb_ownpid = (int) current->pid;
2693  	rv = 0;
2694   out:
2695  	switch (rv) {
2696  	case 0:
2697  		break;
2698  	case -EINVAL:
2699  		/* annoy the user because dlm usage is wrong */
2700  		WARN_ON(1);
2701  		log_error(ls, "%s %d %x %x %x %d %d %s", __func__,
2702  			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2703  			  lkb->lkb_status, lkb->lkb_wait_type,
2704  			  lkb->lkb_resource->res_name);
2705  		break;
2706  	default:
2707  		log_debug(ls, "%s %d %x %x %x %d %d %s", __func__,
2708  			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2709  			  lkb->lkb_status, lkb->lkb_wait_type,
2710  			  lkb->lkb_resource->res_name);
2711  		break;
2712  	}
2713  
2714  	return rv;
2715  }
2716  
2717  /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2718     for success */
2719  
2720  /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2721     because there may be a lookup in progress and it's valid to do
2722     cancel/unlockf on it */
2723  
2724  static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2725  {
2726  	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2727  	int rv = -EBUSY;
2728  
2729  	/* normal unlock not allowed if there's any op in progress */
2730  	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2731  	    (lkb->lkb_wait_type || atomic_read(&lkb->lkb_wait_count)))
2732  		goto out;
2733  
2734  	/* an lkb may be waiting for an rsb lookup to complete where the
2735  	   lookup was initiated by another lock */
2736  
2737  	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2738  		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2739  			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2740  			list_del_init(&lkb->lkb_rsb_lookup);
2741  			queue_cast(lkb->lkb_resource, lkb,
2742  				   args->flags & DLM_LKF_CANCEL ?
2743  				   -DLM_ECANCEL : -DLM_EUNLOCK);
2744  			unhold_lkb(lkb); /* undoes create_lkb() */
2745  		}
2746  		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2747  		goto out;
2748  	}
2749  
2750  	rv = -EINVAL;
2751  	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2752  		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2753  		dlm_print_lkb(lkb);
2754  		goto out;
2755  	}
2756  
2757  	/* an lkb may still exist even though the lock is EOL'ed due to a
2758  	 * cancel, unlock or failed noqueue request; an app can't use these
2759  	 * locks; return same error as if the lkid had not been found at all
2760  	 */
2761  
2762  	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2763  		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2764  		rv = -ENOENT;
2765  		goto out;
2766  	}
2767  
2768  	/* cancel not allowed with another cancel/unlock in progress */
2769  
2770  	if (args->flags & DLM_LKF_CANCEL) {
2771  		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2772  			goto out;
2773  
2774  		if (is_overlap(lkb))
2775  			goto out;
2776  
2777  		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2778  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2779  			rv = -EBUSY;
2780  			goto out;
2781  		}
2782  
2783  		/* there's nothing to cancel */
2784  		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2785  		    !lkb->lkb_wait_type) {
2786  			rv = -EBUSY;
2787  			goto out;
2788  		}
2789  
2790  		switch (lkb->lkb_wait_type) {
2791  		case DLM_MSG_LOOKUP:
2792  		case DLM_MSG_REQUEST:
2793  			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2794  			rv = -EBUSY;
2795  			goto out;
2796  		case DLM_MSG_UNLOCK:
2797  		case DLM_MSG_CANCEL:
2798  			goto out;
2799  		}
2800  		/* add_to_waiters() will set OVERLAP_CANCEL */
2801  		goto out_ok;
2802  	}
2803  
2804  	/* do we need to allow a force-unlock if there's a normal unlock
2805  	   already in progress?  in what conditions could the normal unlock
2806  	   fail such that we'd want to send a force-unlock to be sure? */
2807  
2808  	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2809  		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2810  			goto out;
2811  
2812  		if (is_overlap_unlock(lkb))
2813  			goto out;
2814  
2815  		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2816  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2817  			rv = -EBUSY;
2818  			goto out;
2819  		}
2820  
2821  		switch (lkb->lkb_wait_type) {
2822  		case DLM_MSG_LOOKUP:
2823  		case DLM_MSG_REQUEST:
2824  			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2825  			rv = -EBUSY;
2826  			goto out;
2827  		case DLM_MSG_UNLOCK:
2828  			goto out;
2829  		}
2830  		/* add_to_waiters() will set OVERLAP_UNLOCK */
2831  	}
2832  
2833   out_ok:
2834  	/* an overlapping op shouldn't blow away exflags from other op */
2835  	lkb->lkb_exflags |= args->flags;
2836  	dlm_set_sbflags_val(lkb, 0);
2837  	lkb->lkb_astparam = args->astparam;
2838  	rv = 0;
2839   out:
2840  	switch (rv) {
2841  	case 0:
2842  		break;
2843  	case -EINVAL:
2844  		/* annoy the user because dlm usage is wrong */
2845  		WARN_ON(1);
2846  		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2847  			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2848  			  args->flags, lkb->lkb_wait_type,
2849  			  lkb->lkb_resource->res_name);
2850  		break;
2851  	default:
2852  		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2853  			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
2854  			  args->flags, lkb->lkb_wait_type,
2855  			  lkb->lkb_resource->res_name);
2856  		break;
2857  	}
2858  
2859  	return rv;
2860  }
2861  
2862  /*
2863   * Four stage 4 varieties:
2864   * do_request(), do_convert(), do_unlock(), do_cancel()
2865   * These are called on the master node for the given lock and
2866   * from the central locking logic.
2867   */
2868  
2869  static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2870  {
2871  	int error = 0;
2872  
2873  	if (can_be_granted(r, lkb, 1, 0, NULL)) {
2874  		grant_lock(r, lkb);
2875  		queue_cast(r, lkb, 0);
2876  		goto out;
2877  	}
2878  
2879  	if (can_be_queued(lkb)) {
2880  		error = -EINPROGRESS;
2881  		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2882  		goto out;
2883  	}
2884  
2885  	error = -EAGAIN;
2886  	queue_cast(r, lkb, -EAGAIN);
2887   out:
2888  	return error;
2889  }
2890  
2891  static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2892  			       int error)
2893  {
2894  	switch (error) {
2895  	case -EAGAIN:
2896  		if (force_blocking_asts(lkb))
2897  			send_blocking_asts_all(r, lkb);
2898  		break;
2899  	case -EINPROGRESS:
2900  		send_blocking_asts(r, lkb);
2901  		break;
2902  	}
2903  }
2904  
2905  static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2906  {
2907  	int error = 0;
2908  	int deadlk = 0;
2909  
2910  	/* changing an existing lock may allow others to be granted */
2911  
2912  	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
2913  		grant_lock(r, lkb);
2914  		queue_cast(r, lkb, 0);
2915  		goto out;
2916  	}
2917  
2918  	/* can_be_granted() detected that this lock would block in a conversion
2919  	   deadlock, so we leave it on the granted queue and return EDEADLK in
2920  	   the ast for the convert. */
2921  
2922  	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
2923  		/* it's left on the granted queue */
2924  		revert_lock(r, lkb);
2925  		queue_cast(r, lkb, -EDEADLK);
2926  		error = -EDEADLK;
2927  		goto out;
2928  	}
2929  
2930  	/* is_demoted() means the can_be_granted() above set the grmode
2931  	   to NL, and left us on the granted queue.  This auto-demotion
2932  	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2933  	   now grantable.  We have to try to grant other converting locks
2934  	   before we try again to grant this one. */
2935  
2936  	if (is_demoted(lkb)) {
2937  		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
2938  		if (_can_be_granted(r, lkb, 1, 0)) {
2939  			grant_lock(r, lkb);
2940  			queue_cast(r, lkb, 0);
2941  			goto out;
2942  		}
2943  		/* else fall through and move to convert queue */
2944  	}
2945  
2946  	if (can_be_queued(lkb)) {
2947  		error = -EINPROGRESS;
2948  		del_lkb(r, lkb);
2949  		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2950  		goto out;
2951  	}
2952  
2953  	error = -EAGAIN;
2954  	queue_cast(r, lkb, -EAGAIN);
2955   out:
2956  	return error;
2957  }
2958  
2959  static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2960  			       int error)
2961  {
2962  	switch (error) {
2963  	case 0:
2964  		grant_pending_locks(r, NULL);
2965  		/* grant_pending_locks also sends basts */
2966  		break;
2967  	case -EAGAIN:
2968  		if (force_blocking_asts(lkb))
2969  			send_blocking_asts_all(r, lkb);
2970  		break;
2971  	case -EINPROGRESS:
2972  		send_blocking_asts(r, lkb);
2973  		break;
2974  	}
2975  }
2976  
2977  static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2978  {
2979  	remove_lock(r, lkb);
2980  	queue_cast(r, lkb, -DLM_EUNLOCK);
2981  	return -DLM_EUNLOCK;
2982  }
2983  
2984  static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2985  			      int error)
2986  {
2987  	grant_pending_locks(r, NULL);
2988  }
2989  
2990  /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2991  
2992  static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2993  {
2994  	int error;
2995  
2996  	error = revert_lock(r, lkb);
2997  	if (error) {
2998  		queue_cast(r, lkb, -DLM_ECANCEL);
2999  		return -DLM_ECANCEL;
3000  	}
3001  	return 0;
3002  }
3003  
3004  static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3005  			      int error)
3006  {
3007  	if (error)
3008  		grant_pending_locks(r, NULL);
3009  }
3010  
3011  /*
3012   * Four stage 3 varieties:
3013   * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3014   */
3015  
3016  /* add a new lkb to a possibly new rsb, called by requesting process */
3017  
3018  static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3019  {
3020  	int error;
3021  
3022  	/* set_master: sets lkb nodeid from r */
3023  
3024  	error = set_master(r, lkb);
3025  	if (error < 0)
3026  		goto out;
3027  	if (error) {
3028  		error = 0;
3029  		goto out;
3030  	}
3031  
3032  	if (is_remote(r)) {
3033  		/* receive_request() calls do_request() on remote node */
3034  		error = send_request(r, lkb);
3035  	} else {
3036  		error = do_request(r, lkb);
3037  		/* for remote locks the request_reply is sent
3038  		   between do_request and do_request_effects */
3039  		do_request_effects(r, lkb, error);
3040  	}
3041   out:
3042  	return error;
3043  }
3044  
3045  /* change some property of an existing lkb, e.g. mode */
3046  
3047  static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048  {
3049  	int error;
3050  
3051  	if (is_remote(r)) {
3052  		/* receive_convert() calls do_convert() on remote node */
3053  		error = send_convert(r, lkb);
3054  	} else {
3055  		error = do_convert(r, lkb);
3056  		/* for remote locks the convert_reply is sent
3057  		   between do_convert and do_convert_effects */
3058  		do_convert_effects(r, lkb, error);
3059  	}
3060  
3061  	return error;
3062  }
3063  
3064  /* remove an existing lkb from the granted queue */
3065  
3066  static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3067  {
3068  	int error;
3069  
3070  	if (is_remote(r)) {
3071  		/* receive_unlock() calls do_unlock() on remote node */
3072  		error = send_unlock(r, lkb);
3073  	} else {
3074  		error = do_unlock(r, lkb);
3075  		/* for remote locks the unlock_reply is sent
3076  		   between do_unlock and do_unlock_effects */
3077  		do_unlock_effects(r, lkb, error);
3078  	}
3079  
3080  	return error;
3081  }
3082  
3083  /* remove an existing lkb from the convert or wait queue */
3084  
3085  static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3086  {
3087  	int error;
3088  
3089  	if (is_remote(r)) {
3090  		/* receive_cancel() calls do_cancel() on remote node */
3091  		error = send_cancel(r, lkb);
3092  	} else {
3093  		error = do_cancel(r, lkb);
3094  		/* for remote locks the cancel_reply is sent
3095  		   between do_cancel and do_cancel_effects */
3096  		do_cancel_effects(r, lkb, error);
3097  	}
3098  
3099  	return error;
3100  }
3101  
3102  /*
3103   * Four stage 2 varieties:
3104   * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3105   */
3106  
3107  static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3108  			const void *name, int len,
3109  			struct dlm_args *args)
3110  {
3111  	struct dlm_rsb *r;
3112  	int error;
3113  
3114  	error = validate_lock_args(ls, lkb, args);
3115  	if (error)
3116  		return error;
3117  
3118  	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3119  	if (error)
3120  		return error;
3121  
3122  	lock_rsb(r);
3123  
3124  	attach_lkb(r, lkb);
3125  	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3126  
3127  	error = _request_lock(r, lkb);
3128  
3129  	unlock_rsb(r);
3130  	put_rsb(r);
3131  	return error;
3132  }
3133  
3134  static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3135  			struct dlm_args *args)
3136  {
3137  	struct dlm_rsb *r;
3138  	int error;
3139  
3140  	r = lkb->lkb_resource;
3141  
3142  	hold_rsb(r);
3143  	lock_rsb(r);
3144  
3145  	error = validate_lock_args(ls, lkb, args);
3146  	if (error)
3147  		goto out;
3148  
3149  	error = _convert_lock(r, lkb);
3150   out:
3151  	unlock_rsb(r);
3152  	put_rsb(r);
3153  	return error;
3154  }
3155  
3156  static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3157  		       struct dlm_args *args)
3158  {
3159  	struct dlm_rsb *r;
3160  	int error;
3161  
3162  	r = lkb->lkb_resource;
3163  
3164  	hold_rsb(r);
3165  	lock_rsb(r);
3166  
3167  	error = validate_unlock_args(lkb, args);
3168  	if (error)
3169  		goto out;
3170  
3171  	error = _unlock_lock(r, lkb);
3172   out:
3173  	unlock_rsb(r);
3174  	put_rsb(r);
3175  	return error;
3176  }
3177  
3178  static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3179  		       struct dlm_args *args)
3180  {
3181  	struct dlm_rsb *r;
3182  	int error;
3183  
3184  	r = lkb->lkb_resource;
3185  
3186  	hold_rsb(r);
3187  	lock_rsb(r);
3188  
3189  	error = validate_unlock_args(lkb, args);
3190  	if (error)
3191  		goto out;
3192  
3193  	error = _cancel_lock(r, lkb);
3194   out:
3195  	unlock_rsb(r);
3196  	put_rsb(r);
3197  	return error;
3198  }
3199  
3200  /*
3201   * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3202   */
3203  
3204  int dlm_lock(dlm_lockspace_t *lockspace,
3205  	     int mode,
3206  	     struct dlm_lksb *lksb,
3207  	     uint32_t flags,
3208  	     const void *name,
3209  	     unsigned int namelen,
3210  	     uint32_t parent_lkid,
3211  	     void (*ast) (void *astarg),
3212  	     void *astarg,
3213  	     void (*bast) (void *astarg, int mode))
3214  {
3215  	struct dlm_ls *ls;
3216  	struct dlm_lkb *lkb;
3217  	struct dlm_args args;
3218  	int error, convert = flags & DLM_LKF_CONVERT;
3219  
3220  	ls = dlm_find_lockspace_local(lockspace);
3221  	if (!ls)
3222  		return -EINVAL;
3223  
3224  	dlm_lock_recovery(ls);
3225  
3226  	if (convert)
3227  		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3228  	else
3229  		error = create_lkb(ls, &lkb);
3230  
3231  	if (error)
3232  		goto out;
3233  
3234  	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3235  
3236  	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3237  			      &args);
3238  	if (error)
3239  		goto out_put;
3240  
3241  	if (convert)
3242  		error = convert_lock(ls, lkb, &args);
3243  	else
3244  		error = request_lock(ls, lkb, name, namelen, &args);
3245  
3246  	if (error == -EINPROGRESS)
3247  		error = 0;
3248   out_put:
3249  	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3250  
3251  	if (convert || error)
3252  		__put_lkb(ls, lkb);
3253  	if (error == -EAGAIN || error == -EDEADLK)
3254  		error = 0;
3255   out:
3256  	dlm_unlock_recovery(ls);
3257  	dlm_put_lockspace(ls);
3258  	return error;
3259  }
3260  
3261  int dlm_unlock(dlm_lockspace_t *lockspace,
3262  	       uint32_t lkid,
3263  	       uint32_t flags,
3264  	       struct dlm_lksb *lksb,
3265  	       void *astarg)
3266  {
3267  	struct dlm_ls *ls;
3268  	struct dlm_lkb *lkb;
3269  	struct dlm_args args;
3270  	int error;
3271  
3272  	ls = dlm_find_lockspace_local(lockspace);
3273  	if (!ls)
3274  		return -EINVAL;
3275  
3276  	dlm_lock_recovery(ls);
3277  
3278  	error = find_lkb(ls, lkid, &lkb);
3279  	if (error)
3280  		goto out;
3281  
3282  	trace_dlm_unlock_start(ls, lkb, flags);
3283  
3284  	error = set_unlock_args(flags, astarg, &args);
3285  	if (error)
3286  		goto out_put;
3287  
3288  	if (flags & DLM_LKF_CANCEL)
3289  		error = cancel_lock(ls, lkb, &args);
3290  	else
3291  		error = unlock_lock(ls, lkb, &args);
3292  
3293  	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3294  		error = 0;
3295  	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3296  		error = 0;
3297   out_put:
3298  	trace_dlm_unlock_end(ls, lkb, flags, error);
3299  
3300  	dlm_put_lkb(lkb);
3301   out:
3302  	dlm_unlock_recovery(ls);
3303  	dlm_put_lockspace(ls);
3304  	return error;
3305  }
3306  
3307  /*
3308   * send/receive routines for remote operations and replies
3309   *
3310   * send_args
3311   * send_common
3312   * send_request			receive_request
3313   * send_convert			receive_convert
3314   * send_unlock			receive_unlock
3315   * send_cancel			receive_cancel
3316   * send_grant			receive_grant
3317   * send_bast			receive_bast
3318   * send_lookup			receive_lookup
3319   * send_remove			receive_remove
3320   *
3321   * 				send_common_reply
3322   * receive_request_reply	send_request_reply
3323   * receive_convert_reply	send_convert_reply
3324   * receive_unlock_reply		send_unlock_reply
3325   * receive_cancel_reply		send_cancel_reply
3326   * receive_lookup_reply		send_lookup_reply
3327   */
3328  
3329  static int _create_message(struct dlm_ls *ls, int mb_len,
3330  			   int to_nodeid, int mstype,
3331  			   struct dlm_message **ms_ret,
3332  			   struct dlm_mhandle **mh_ret,
3333  			   gfp_t allocation)
3334  {
3335  	struct dlm_message *ms;
3336  	struct dlm_mhandle *mh;
3337  	char *mb;
3338  
3339  	/* get_buffer gives us a message handle (mh) that we need to
3340  	   pass into midcomms_commit and a message buffer (mb) that we
3341  	   write our data into */
3342  
3343  	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, allocation, &mb);
3344  	if (!mh)
3345  		return -ENOBUFS;
3346  
3347  	ms = (struct dlm_message *) mb;
3348  
3349  	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3350  	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3351  	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3352  	ms->m_header.h_length = cpu_to_le16(mb_len);
3353  	ms->m_header.h_cmd = DLM_MSG;
3354  
3355  	ms->m_type = cpu_to_le32(mstype);
3356  
3357  	*mh_ret = mh;
3358  	*ms_ret = ms;
3359  	return 0;
3360  }
3361  
3362  static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3363  			  int to_nodeid, int mstype,
3364  			  struct dlm_message **ms_ret,
3365  			  struct dlm_mhandle **mh_ret,
3366  			  gfp_t allocation)
3367  {
3368  	int mb_len = sizeof(struct dlm_message);
3369  
3370  	switch (mstype) {
3371  	case DLM_MSG_REQUEST:
3372  	case DLM_MSG_LOOKUP:
3373  	case DLM_MSG_REMOVE:
3374  		mb_len += r->res_length;
3375  		break;
3376  	case DLM_MSG_CONVERT:
3377  	case DLM_MSG_UNLOCK:
3378  	case DLM_MSG_REQUEST_REPLY:
3379  	case DLM_MSG_CONVERT_REPLY:
3380  	case DLM_MSG_GRANT:
3381  		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3382  			mb_len += r->res_ls->ls_lvblen;
3383  		break;
3384  	}
3385  
3386  	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3387  			       ms_ret, mh_ret, allocation);
3388  }
3389  
3390  /* further lowcomms enhancements or alternate implementations may make
3391     the return value from this function useful at some point */
3392  
3393  static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3394  			const void *name, int namelen)
3395  {
3396  	dlm_midcomms_commit_mhandle(mh, name, namelen);
3397  	return 0;
3398  }
3399  
3400  static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3401  		      struct dlm_message *ms)
3402  {
3403  	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3404  	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3405  	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3406  	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3407  	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3408  	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3409  	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3410  	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3411  	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3412  	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3413  	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3414  	ms->m_hash     = cpu_to_le32(r->res_hash);
3415  
3416  	/* m_result and m_bastmode are set from function args,
3417  	   not from lkb fields */
3418  
3419  	if (lkb->lkb_bastfn)
3420  		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3421  	if (lkb->lkb_astfn)
3422  		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3423  
3424  	/* compare with switch in create_message; send_remove() doesn't
3425  	   use send_args() */
3426  
3427  	switch (ms->m_type) {
3428  	case cpu_to_le32(DLM_MSG_REQUEST):
3429  	case cpu_to_le32(DLM_MSG_LOOKUP):
3430  		memcpy(ms->m_extra, r->res_name, r->res_length);
3431  		break;
3432  	case cpu_to_le32(DLM_MSG_CONVERT):
3433  	case cpu_to_le32(DLM_MSG_UNLOCK):
3434  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3435  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3436  	case cpu_to_le32(DLM_MSG_GRANT):
3437  		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3438  			break;
3439  		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3440  		break;
3441  	}
3442  }
3443  
3444  static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3445  {
3446  	struct dlm_message *ms;
3447  	struct dlm_mhandle *mh;
3448  	int to_nodeid, error;
3449  
3450  	to_nodeid = r->res_nodeid;
3451  
3452  	error = add_to_waiters(lkb, mstype, to_nodeid);
3453  	if (error)
3454  		return error;
3455  
3456  	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3457  	if (error)
3458  		goto fail;
3459  
3460  	send_args(r, lkb, ms);
3461  
3462  	error = send_message(mh, ms, r->res_name, r->res_length);
3463  	if (error)
3464  		goto fail;
3465  	return 0;
3466  
3467   fail:
3468  	remove_from_waiters(lkb, msg_reply_type(mstype));
3469  	return error;
3470  }
3471  
3472  static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3473  {
3474  	return send_common(r, lkb, DLM_MSG_REQUEST);
3475  }
3476  
3477  static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3478  {
3479  	int error;
3480  
3481  	error = send_common(r, lkb, DLM_MSG_CONVERT);
3482  
3483  	/* down conversions go without a reply from the master */
3484  	if (!error && down_conversion(lkb)) {
3485  		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3486  		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3487  		r->res_ls->ls_local_ms.m_result = 0;
3488  		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3489  	}
3490  
3491  	return error;
3492  }
3493  
3494  /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3495     MASTER_UNCERTAIN to force the next request on the rsb to confirm
3496     that the master is still correct. */
3497  
3498  static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3499  {
3500  	return send_common(r, lkb, DLM_MSG_UNLOCK);
3501  }
3502  
3503  static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3504  {
3505  	return send_common(r, lkb, DLM_MSG_CANCEL);
3506  }
3507  
3508  static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3509  {
3510  	struct dlm_message *ms;
3511  	struct dlm_mhandle *mh;
3512  	int to_nodeid, error;
3513  
3514  	to_nodeid = lkb->lkb_nodeid;
3515  
3516  	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh,
3517  			       GFP_NOFS);
3518  	if (error)
3519  		goto out;
3520  
3521  	send_args(r, lkb, ms);
3522  
3523  	ms->m_result = 0;
3524  
3525  	error = send_message(mh, ms, r->res_name, r->res_length);
3526   out:
3527  	return error;
3528  }
3529  
3530  static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3531  {
3532  	struct dlm_message *ms;
3533  	struct dlm_mhandle *mh;
3534  	int to_nodeid, error;
3535  
3536  	to_nodeid = lkb->lkb_nodeid;
3537  
3538  	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh,
3539  			       GFP_NOFS);
3540  	if (error)
3541  		goto out;
3542  
3543  	send_args(r, lkb, ms);
3544  
3545  	ms->m_bastmode = cpu_to_le32(mode);
3546  
3547  	error = send_message(mh, ms, r->res_name, r->res_length);
3548   out:
3549  	return error;
3550  }
3551  
3552  static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3553  {
3554  	struct dlm_message *ms;
3555  	struct dlm_mhandle *mh;
3556  	int to_nodeid, error;
3557  
3558  	to_nodeid = dlm_dir_nodeid(r);
3559  
3560  	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3561  	if (error)
3562  		return error;
3563  
3564  	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh,
3565  			       GFP_NOFS);
3566  	if (error)
3567  		goto fail;
3568  
3569  	send_args(r, lkb, ms);
3570  
3571  	error = send_message(mh, ms, r->res_name, r->res_length);
3572  	if (error)
3573  		goto fail;
3574  	return 0;
3575  
3576   fail:
3577  	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3578  	return error;
3579  }
3580  
3581  static int send_remove(struct dlm_rsb *r)
3582  {
3583  	struct dlm_message *ms;
3584  	struct dlm_mhandle *mh;
3585  	int to_nodeid, error;
3586  
3587  	to_nodeid = dlm_dir_nodeid(r);
3588  
3589  	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh,
3590  			       GFP_ATOMIC);
3591  	if (error)
3592  		goto out;
3593  
3594  	memcpy(ms->m_extra, r->res_name, r->res_length);
3595  	ms->m_hash = cpu_to_le32(r->res_hash);
3596  
3597  	error = send_message(mh, ms, r->res_name, r->res_length);
3598   out:
3599  	return error;
3600  }
3601  
3602  static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3603  			     int mstype, int rv)
3604  {
3605  	struct dlm_message *ms;
3606  	struct dlm_mhandle *mh;
3607  	int to_nodeid, error;
3608  
3609  	to_nodeid = lkb->lkb_nodeid;
3610  
3611  	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh, GFP_NOFS);
3612  	if (error)
3613  		goto out;
3614  
3615  	send_args(r, lkb, ms);
3616  
3617  	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3618  
3619  	error = send_message(mh, ms, r->res_name, r->res_length);
3620   out:
3621  	return error;
3622  }
3623  
3624  static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3625  {
3626  	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3627  }
3628  
3629  static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3630  {
3631  	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3632  }
3633  
3634  static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3635  {
3636  	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3637  }
3638  
3639  static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3640  {
3641  	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3642  }
3643  
3644  static int send_lookup_reply(struct dlm_ls *ls,
3645  			     const struct dlm_message *ms_in, int ret_nodeid,
3646  			     int rv)
3647  {
3648  	struct dlm_rsb *r = &ls->ls_local_rsb;
3649  	struct dlm_message *ms;
3650  	struct dlm_mhandle *mh;
3651  	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3652  
3653  	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh,
3654  			       GFP_NOFS);
3655  	if (error)
3656  		goto out;
3657  
3658  	ms->m_lkid = ms_in->m_lkid;
3659  	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3660  	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3661  
3662  	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3663   out:
3664  	return error;
3665  }
3666  
3667  /* which args we save from a received message depends heavily on the type
3668     of message, unlike the send side where we can safely send everything about
3669     the lkb for any type of message */
3670  
3671  static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3672  {
3673  	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3674  	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3675  	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3676  }
3677  
3678  static void receive_flags_reply(struct dlm_lkb *lkb,
3679  				const struct dlm_message *ms,
3680  				bool local)
3681  {
3682  	if (local)
3683  		return;
3684  
3685  	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3686  	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3687  }
3688  
3689  static int receive_extralen(const struct dlm_message *ms)
3690  {
3691  	return (le16_to_cpu(ms->m_header.h_length) -
3692  		sizeof(struct dlm_message));
3693  }
3694  
3695  static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3696  		       const struct dlm_message *ms)
3697  {
3698  	int len;
3699  
3700  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3701  		if (!lkb->lkb_lvbptr)
3702  			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3703  		if (!lkb->lkb_lvbptr)
3704  			return -ENOMEM;
3705  		len = receive_extralen(ms);
3706  		if (len > ls->ls_lvblen)
3707  			len = ls->ls_lvblen;
3708  		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3709  	}
3710  	return 0;
3711  }
3712  
3713  static void fake_bastfn(void *astparam, int mode)
3714  {
3715  	log_print("fake_bastfn should not be called");
3716  }
3717  
3718  static void fake_astfn(void *astparam)
3719  {
3720  	log_print("fake_astfn should not be called");
3721  }
3722  
3723  static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3724  				const struct dlm_message *ms)
3725  {
3726  	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3727  	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3728  	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3729  	lkb->lkb_grmode = DLM_LOCK_IV;
3730  	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3731  
3732  	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3733  	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3734  
3735  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3736  		/* lkb was just created so there won't be an lvb yet */
3737  		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3738  		if (!lkb->lkb_lvbptr)
3739  			return -ENOMEM;
3740  	}
3741  
3742  	return 0;
3743  }
3744  
3745  static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3746  				const struct dlm_message *ms)
3747  {
3748  	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3749  		return -EBUSY;
3750  
3751  	if (receive_lvb(ls, lkb, ms))
3752  		return -ENOMEM;
3753  
3754  	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3755  	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3756  
3757  	return 0;
3758  }
3759  
3760  static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3761  			       const struct dlm_message *ms)
3762  {
3763  	if (receive_lvb(ls, lkb, ms))
3764  		return -ENOMEM;
3765  	return 0;
3766  }
3767  
3768  /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3769     uses to send a reply and that the remote end uses to process the reply. */
3770  
3771  static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3772  {
3773  	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3774  	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3775  	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3776  }
3777  
3778  /* This is called after the rsb is locked so that we can safely inspect
3779     fields in the lkb. */
3780  
3781  static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3782  {
3783  	int from = le32_to_cpu(ms->m_header.h_nodeid);
3784  	int error = 0;
3785  
3786  	/* currently mixing of user/kernel locks are not supported */
3787  	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3788  	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3789  		log_error(lkb->lkb_resource->res_ls,
3790  			  "got user dlm message for a kernel lock");
3791  		error = -EINVAL;
3792  		goto out;
3793  	}
3794  
3795  	switch (ms->m_type) {
3796  	case cpu_to_le32(DLM_MSG_CONVERT):
3797  	case cpu_to_le32(DLM_MSG_UNLOCK):
3798  	case cpu_to_le32(DLM_MSG_CANCEL):
3799  		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3800  			error = -EINVAL;
3801  		break;
3802  
3803  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3804  	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3805  	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3806  	case cpu_to_le32(DLM_MSG_GRANT):
3807  	case cpu_to_le32(DLM_MSG_BAST):
3808  		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3809  			error = -EINVAL;
3810  		break;
3811  
3812  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3813  		if (!is_process_copy(lkb))
3814  			error = -EINVAL;
3815  		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3816  			error = -EINVAL;
3817  		break;
3818  
3819  	default:
3820  		error = -EINVAL;
3821  	}
3822  
3823  out:
3824  	if (error)
3825  		log_error(lkb->lkb_resource->res_ls,
3826  			  "ignore invalid message %d from %d %x %x %x %d",
3827  			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3828  			  lkb->lkb_remid, dlm_iflags_val(lkb),
3829  			  lkb->lkb_nodeid);
3830  	return error;
3831  }
3832  
3833  static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3834  {
3835  	struct dlm_lkb *lkb;
3836  	struct dlm_rsb *r;
3837  	int from_nodeid;
3838  	int error, namelen = 0;
3839  
3840  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3841  
3842  	error = create_lkb(ls, &lkb);
3843  	if (error)
3844  		goto fail;
3845  
3846  	receive_flags(lkb, ms);
3847  	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3848  	error = receive_request_args(ls, lkb, ms);
3849  	if (error) {
3850  		__put_lkb(ls, lkb);
3851  		goto fail;
3852  	}
3853  
3854  	/* The dir node is the authority on whether we are the master
3855  	   for this rsb or not, so if the master sends us a request, we should
3856  	   recreate the rsb if we've destroyed it.   This race happens when we
3857  	   send a remove message to the dir node at the same time that the dir
3858  	   node sends us a request for the rsb. */
3859  
3860  	namelen = receive_extralen(ms);
3861  
3862  	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
3863  			 R_RECEIVE_REQUEST, &r);
3864  	if (error) {
3865  		__put_lkb(ls, lkb);
3866  		goto fail;
3867  	}
3868  
3869  	lock_rsb(r);
3870  
3871  	if (r->res_master_nodeid != dlm_our_nodeid()) {
3872  		error = validate_master_nodeid(ls, r, from_nodeid);
3873  		if (error) {
3874  			unlock_rsb(r);
3875  			put_rsb(r);
3876  			__put_lkb(ls, lkb);
3877  			goto fail;
3878  		}
3879  	}
3880  
3881  	attach_lkb(r, lkb);
3882  	error = do_request(r, lkb);
3883  	send_request_reply(r, lkb, error);
3884  	do_request_effects(r, lkb, error);
3885  
3886  	unlock_rsb(r);
3887  	put_rsb(r);
3888  
3889  	if (error == -EINPROGRESS)
3890  		error = 0;
3891  	if (error)
3892  		dlm_put_lkb(lkb);
3893  	return 0;
3894  
3895   fail:
3896  	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
3897  	   and do this receive_request again from process_lookup_list once
3898  	   we get the lookup reply.  This would avoid a many repeated
3899  	   ENOTBLK request failures when the lookup reply designating us
3900  	   as master is delayed. */
3901  
3902  	if (error != -ENOTBLK) {
3903  		log_limit(ls, "receive_request %x from %d %d",
3904  			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
3905  	}
3906  
3907  	setup_local_lkb(ls, ms);
3908  	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3909  	return error;
3910  }
3911  
3912  static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
3913  {
3914  	struct dlm_lkb *lkb;
3915  	struct dlm_rsb *r;
3916  	int error, reply = 1;
3917  
3918  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3919  	if (error)
3920  		goto fail;
3921  
3922  	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3923  		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
3924  			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
3925  			  (unsigned long long)lkb->lkb_recover_seq,
3926  			  le32_to_cpu(ms->m_header.h_nodeid),
3927  			  le32_to_cpu(ms->m_lkid));
3928  		error = -ENOENT;
3929  		dlm_put_lkb(lkb);
3930  		goto fail;
3931  	}
3932  
3933  	r = lkb->lkb_resource;
3934  
3935  	hold_rsb(r);
3936  	lock_rsb(r);
3937  
3938  	error = validate_message(lkb, ms);
3939  	if (error)
3940  		goto out;
3941  
3942  	receive_flags(lkb, ms);
3943  
3944  	error = receive_convert_args(ls, lkb, ms);
3945  	if (error) {
3946  		send_convert_reply(r, lkb, error);
3947  		goto out;
3948  	}
3949  
3950  	reply = !down_conversion(lkb);
3951  
3952  	error = do_convert(r, lkb);
3953  	if (reply)
3954  		send_convert_reply(r, lkb, error);
3955  	do_convert_effects(r, lkb, error);
3956   out:
3957  	unlock_rsb(r);
3958  	put_rsb(r);
3959  	dlm_put_lkb(lkb);
3960  	return 0;
3961  
3962   fail:
3963  	setup_local_lkb(ls, ms);
3964  	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
3965  	return error;
3966  }
3967  
3968  static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
3969  {
3970  	struct dlm_lkb *lkb;
3971  	struct dlm_rsb *r;
3972  	int error;
3973  
3974  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
3975  	if (error)
3976  		goto fail;
3977  
3978  	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
3979  		log_error(ls, "receive_unlock %x remid %x remote %d %x",
3980  			  lkb->lkb_id, lkb->lkb_remid,
3981  			  le32_to_cpu(ms->m_header.h_nodeid),
3982  			  le32_to_cpu(ms->m_lkid));
3983  		error = -ENOENT;
3984  		dlm_put_lkb(lkb);
3985  		goto fail;
3986  	}
3987  
3988  	r = lkb->lkb_resource;
3989  
3990  	hold_rsb(r);
3991  	lock_rsb(r);
3992  
3993  	error = validate_message(lkb, ms);
3994  	if (error)
3995  		goto out;
3996  
3997  	receive_flags(lkb, ms);
3998  
3999  	error = receive_unlock_args(ls, lkb, ms);
4000  	if (error) {
4001  		send_unlock_reply(r, lkb, error);
4002  		goto out;
4003  	}
4004  
4005  	error = do_unlock(r, lkb);
4006  	send_unlock_reply(r, lkb, error);
4007  	do_unlock_effects(r, lkb, error);
4008   out:
4009  	unlock_rsb(r);
4010  	put_rsb(r);
4011  	dlm_put_lkb(lkb);
4012  	return 0;
4013  
4014   fail:
4015  	setup_local_lkb(ls, ms);
4016  	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4017  	return error;
4018  }
4019  
4020  static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4021  {
4022  	struct dlm_lkb *lkb;
4023  	struct dlm_rsb *r;
4024  	int error;
4025  
4026  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4027  	if (error)
4028  		goto fail;
4029  
4030  	receive_flags(lkb, ms);
4031  
4032  	r = lkb->lkb_resource;
4033  
4034  	hold_rsb(r);
4035  	lock_rsb(r);
4036  
4037  	error = validate_message(lkb, ms);
4038  	if (error)
4039  		goto out;
4040  
4041  	error = do_cancel(r, lkb);
4042  	send_cancel_reply(r, lkb, error);
4043  	do_cancel_effects(r, lkb, error);
4044   out:
4045  	unlock_rsb(r);
4046  	put_rsb(r);
4047  	dlm_put_lkb(lkb);
4048  	return 0;
4049  
4050   fail:
4051  	setup_local_lkb(ls, ms);
4052  	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4053  	return error;
4054  }
4055  
4056  static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4057  {
4058  	struct dlm_lkb *lkb;
4059  	struct dlm_rsb *r;
4060  	int error;
4061  
4062  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4063  	if (error)
4064  		return error;
4065  
4066  	r = lkb->lkb_resource;
4067  
4068  	hold_rsb(r);
4069  	lock_rsb(r);
4070  
4071  	error = validate_message(lkb, ms);
4072  	if (error)
4073  		goto out;
4074  
4075  	receive_flags_reply(lkb, ms, false);
4076  	if (is_altmode(lkb))
4077  		munge_altmode(lkb, ms);
4078  	grant_lock_pc(r, lkb, ms);
4079  	queue_cast(r, lkb, 0);
4080   out:
4081  	unlock_rsb(r);
4082  	put_rsb(r);
4083  	dlm_put_lkb(lkb);
4084  	return 0;
4085  }
4086  
4087  static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4088  {
4089  	struct dlm_lkb *lkb;
4090  	struct dlm_rsb *r;
4091  	int error;
4092  
4093  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4094  	if (error)
4095  		return error;
4096  
4097  	r = lkb->lkb_resource;
4098  
4099  	hold_rsb(r);
4100  	lock_rsb(r);
4101  
4102  	error = validate_message(lkb, ms);
4103  	if (error)
4104  		goto out;
4105  
4106  	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4107  	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4108   out:
4109  	unlock_rsb(r);
4110  	put_rsb(r);
4111  	dlm_put_lkb(lkb);
4112  	return 0;
4113  }
4114  
4115  static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4116  {
4117  	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4118  
4119  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4120  	our_nodeid = dlm_our_nodeid();
4121  
4122  	len = receive_extralen(ms);
4123  
4124  	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4125  				  &ret_nodeid, NULL);
4126  
4127  	/* Optimization: we're master so treat lookup as a request */
4128  	if (!error && ret_nodeid == our_nodeid) {
4129  		receive_request(ls, ms);
4130  		return;
4131  	}
4132  	send_lookup_reply(ls, ms, ret_nodeid, error);
4133  }
4134  
4135  static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4136  {
4137  	char name[DLM_RESNAME_MAXLEN+1];
4138  	struct dlm_rsb *r;
4139  	uint32_t hash, b;
4140  	int rv, len, dir_nodeid, from_nodeid;
4141  
4142  	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4143  
4144  	len = receive_extralen(ms);
4145  
4146  	if (len > DLM_RESNAME_MAXLEN) {
4147  		log_error(ls, "receive_remove from %d bad len %d",
4148  			  from_nodeid, len);
4149  		return;
4150  	}
4151  
4152  	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4153  	if (dir_nodeid != dlm_our_nodeid()) {
4154  		log_error(ls, "receive_remove from %d bad nodeid %d",
4155  			  from_nodeid, dir_nodeid);
4156  		return;
4157  	}
4158  
4159  	/* Look for name on rsbtbl.toss, if it's there, kill it.
4160  	   If it's on rsbtbl.keep, it's being used, and we should ignore this
4161  	   message.  This is an expected race between the dir node sending a
4162  	   request to the master node at the same time as the master node sends
4163  	   a remove to the dir node.  The resolution to that race is for the
4164  	   dir node to ignore the remove message, and the master node to
4165  	   recreate the master rsb when it gets a request from the dir node for
4166  	   an rsb it doesn't have. */
4167  
4168  	memset(name, 0, sizeof(name));
4169  	memcpy(name, ms->m_extra, len);
4170  
4171  	hash = jhash(name, len, 0);
4172  	b = hash & (ls->ls_rsbtbl_size - 1);
4173  
4174  	spin_lock(&ls->ls_rsbtbl[b].lock);
4175  
4176  	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
4177  	if (rv) {
4178  		/* verify the rsb is on keep list per comment above */
4179  		rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
4180  		if (rv) {
4181  			/* should not happen */
4182  			log_error(ls, "receive_remove from %d not found %s",
4183  				  from_nodeid, name);
4184  			spin_unlock(&ls->ls_rsbtbl[b].lock);
4185  			return;
4186  		}
4187  		if (r->res_master_nodeid != from_nodeid) {
4188  			/* should not happen */
4189  			log_error(ls, "receive_remove keep from %d master %d",
4190  				  from_nodeid, r->res_master_nodeid);
4191  			dlm_print_rsb(r);
4192  			spin_unlock(&ls->ls_rsbtbl[b].lock);
4193  			return;
4194  		}
4195  
4196  		log_debug(ls, "receive_remove from %d master %d first %x %s",
4197  			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4198  			  name);
4199  		spin_unlock(&ls->ls_rsbtbl[b].lock);
4200  		return;
4201  	}
4202  
4203  	if (r->res_master_nodeid != from_nodeid) {
4204  		log_error(ls, "receive_remove toss from %d master %d",
4205  			  from_nodeid, r->res_master_nodeid);
4206  		dlm_print_rsb(r);
4207  		spin_unlock(&ls->ls_rsbtbl[b].lock);
4208  		return;
4209  	}
4210  
4211  	if (kref_put(&r->res_ref, kill_rsb)) {
4212  		rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
4213  		spin_unlock(&ls->ls_rsbtbl[b].lock);
4214  		dlm_free_rsb(r);
4215  	} else {
4216  		log_error(ls, "receive_remove from %d rsb ref error",
4217  			  from_nodeid);
4218  		dlm_print_rsb(r);
4219  		spin_unlock(&ls->ls_rsbtbl[b].lock);
4220  	}
4221  }
4222  
4223  static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4224  {
4225  	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4226  }
4227  
4228  static int receive_request_reply(struct dlm_ls *ls,
4229  				 const struct dlm_message *ms)
4230  {
4231  	struct dlm_lkb *lkb;
4232  	struct dlm_rsb *r;
4233  	int error, mstype, result;
4234  	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4235  
4236  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4237  	if (error)
4238  		return error;
4239  
4240  	r = lkb->lkb_resource;
4241  	hold_rsb(r);
4242  	lock_rsb(r);
4243  
4244  	error = validate_message(lkb, ms);
4245  	if (error)
4246  		goto out;
4247  
4248  	mstype = lkb->lkb_wait_type;
4249  	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4250  	if (error) {
4251  		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4252  			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4253  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4254  		dlm_dump_rsb(r);
4255  		goto out;
4256  	}
4257  
4258  	/* Optimization: the dir node was also the master, so it took our
4259  	   lookup as a request and sent request reply instead of lookup reply */
4260  	if (mstype == DLM_MSG_LOOKUP) {
4261  		r->res_master_nodeid = from_nodeid;
4262  		r->res_nodeid = from_nodeid;
4263  		lkb->lkb_nodeid = from_nodeid;
4264  	}
4265  
4266  	/* this is the value returned from do_request() on the master */
4267  	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4268  
4269  	switch (result) {
4270  	case -EAGAIN:
4271  		/* request would block (be queued) on remote master */
4272  		queue_cast(r, lkb, -EAGAIN);
4273  		confirm_master(r, -EAGAIN);
4274  		unhold_lkb(lkb); /* undoes create_lkb() */
4275  		break;
4276  
4277  	case -EINPROGRESS:
4278  	case 0:
4279  		/* request was queued or granted on remote master */
4280  		receive_flags_reply(lkb, ms, false);
4281  		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4282  		if (is_altmode(lkb))
4283  			munge_altmode(lkb, ms);
4284  		if (result) {
4285  			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4286  		} else {
4287  			grant_lock_pc(r, lkb, ms);
4288  			queue_cast(r, lkb, 0);
4289  		}
4290  		confirm_master(r, result);
4291  		break;
4292  
4293  	case -EBADR:
4294  	case -ENOTBLK:
4295  		/* find_rsb failed to find rsb or rsb wasn't master */
4296  		log_limit(ls, "receive_request_reply %x from %d %d "
4297  			  "master %d dir %d first %x %s", lkb->lkb_id,
4298  			  from_nodeid, result, r->res_master_nodeid,
4299  			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4300  
4301  		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4302  		    r->res_master_nodeid != dlm_our_nodeid()) {
4303  			/* cause _request_lock->set_master->send_lookup */
4304  			r->res_master_nodeid = 0;
4305  			r->res_nodeid = -1;
4306  			lkb->lkb_nodeid = -1;
4307  		}
4308  
4309  		if (is_overlap(lkb)) {
4310  			/* we'll ignore error in cancel/unlock reply */
4311  			queue_cast_overlap(r, lkb);
4312  			confirm_master(r, result);
4313  			unhold_lkb(lkb); /* undoes create_lkb() */
4314  		} else {
4315  			_request_lock(r, lkb);
4316  
4317  			if (r->res_master_nodeid == dlm_our_nodeid())
4318  				confirm_master(r, 0);
4319  		}
4320  		break;
4321  
4322  	default:
4323  		log_error(ls, "receive_request_reply %x error %d",
4324  			  lkb->lkb_id, result);
4325  	}
4326  
4327  	if ((result == 0 || result == -EINPROGRESS) &&
4328  	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4329  		log_debug(ls, "receive_request_reply %x result %d unlock",
4330  			  lkb->lkb_id, result);
4331  		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4332  		send_unlock(r, lkb);
4333  	} else if ((result == -EINPROGRESS) &&
4334  		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4335  				      &lkb->lkb_iflags)) {
4336  		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4337  		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4338  		send_cancel(r, lkb);
4339  	} else {
4340  		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4341  		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4342  	}
4343   out:
4344  	unlock_rsb(r);
4345  	put_rsb(r);
4346  	dlm_put_lkb(lkb);
4347  	return 0;
4348  }
4349  
4350  static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4351  				    const struct dlm_message *ms, bool local)
4352  {
4353  	/* this is the value returned from do_convert() on the master */
4354  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4355  	case -EAGAIN:
4356  		/* convert would block (be queued) on remote master */
4357  		queue_cast(r, lkb, -EAGAIN);
4358  		break;
4359  
4360  	case -EDEADLK:
4361  		receive_flags_reply(lkb, ms, local);
4362  		revert_lock_pc(r, lkb);
4363  		queue_cast(r, lkb, -EDEADLK);
4364  		break;
4365  
4366  	case -EINPROGRESS:
4367  		/* convert was queued on remote master */
4368  		receive_flags_reply(lkb, ms, local);
4369  		if (is_demoted(lkb))
4370  			munge_demoted(lkb);
4371  		del_lkb(r, lkb);
4372  		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4373  		break;
4374  
4375  	case 0:
4376  		/* convert was granted on remote master */
4377  		receive_flags_reply(lkb, ms, local);
4378  		if (is_demoted(lkb))
4379  			munge_demoted(lkb);
4380  		grant_lock_pc(r, lkb, ms);
4381  		queue_cast(r, lkb, 0);
4382  		break;
4383  
4384  	default:
4385  		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4386  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4387  			  le32_to_cpu(ms->m_lkid),
4388  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4389  		dlm_print_rsb(r);
4390  		dlm_print_lkb(lkb);
4391  	}
4392  }
4393  
4394  static void _receive_convert_reply(struct dlm_lkb *lkb,
4395  				   const struct dlm_message *ms, bool local)
4396  {
4397  	struct dlm_rsb *r = lkb->lkb_resource;
4398  	int error;
4399  
4400  	hold_rsb(r);
4401  	lock_rsb(r);
4402  
4403  	error = validate_message(lkb, ms);
4404  	if (error)
4405  		goto out;
4406  
4407  	/* local reply can happen with waiters_mutex held */
4408  	error = remove_from_waiters_ms(lkb, ms, local);
4409  	if (error)
4410  		goto out;
4411  
4412  	__receive_convert_reply(r, lkb, ms, local);
4413   out:
4414  	unlock_rsb(r);
4415  	put_rsb(r);
4416  }
4417  
4418  static int receive_convert_reply(struct dlm_ls *ls,
4419  				 const struct dlm_message *ms)
4420  {
4421  	struct dlm_lkb *lkb;
4422  	int error;
4423  
4424  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4425  	if (error)
4426  		return error;
4427  
4428  	_receive_convert_reply(lkb, ms, false);
4429  	dlm_put_lkb(lkb);
4430  	return 0;
4431  }
4432  
4433  static void _receive_unlock_reply(struct dlm_lkb *lkb,
4434  				  const struct dlm_message *ms, bool local)
4435  {
4436  	struct dlm_rsb *r = lkb->lkb_resource;
4437  	int error;
4438  
4439  	hold_rsb(r);
4440  	lock_rsb(r);
4441  
4442  	error = validate_message(lkb, ms);
4443  	if (error)
4444  		goto out;
4445  
4446  	/* local reply can happen with waiters_mutex held */
4447  	error = remove_from_waiters_ms(lkb, ms, local);
4448  	if (error)
4449  		goto out;
4450  
4451  	/* this is the value returned from do_unlock() on the master */
4452  
4453  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4454  	case -DLM_EUNLOCK:
4455  		receive_flags_reply(lkb, ms, local);
4456  		remove_lock_pc(r, lkb);
4457  		queue_cast(r, lkb, -DLM_EUNLOCK);
4458  		break;
4459  	case -ENOENT:
4460  		break;
4461  	default:
4462  		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4463  			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4464  	}
4465   out:
4466  	unlock_rsb(r);
4467  	put_rsb(r);
4468  }
4469  
4470  static int receive_unlock_reply(struct dlm_ls *ls,
4471  				const struct dlm_message *ms)
4472  {
4473  	struct dlm_lkb *lkb;
4474  	int error;
4475  
4476  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4477  	if (error)
4478  		return error;
4479  
4480  	_receive_unlock_reply(lkb, ms, false);
4481  	dlm_put_lkb(lkb);
4482  	return 0;
4483  }
4484  
4485  static void _receive_cancel_reply(struct dlm_lkb *lkb,
4486  				  const struct dlm_message *ms, bool local)
4487  {
4488  	struct dlm_rsb *r = lkb->lkb_resource;
4489  	int error;
4490  
4491  	hold_rsb(r);
4492  	lock_rsb(r);
4493  
4494  	error = validate_message(lkb, ms);
4495  	if (error)
4496  		goto out;
4497  
4498  	/* local reply can happen with waiters_mutex held */
4499  	error = remove_from_waiters_ms(lkb, ms, local);
4500  	if (error)
4501  		goto out;
4502  
4503  	/* this is the value returned from do_cancel() on the master */
4504  
4505  	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4506  	case -DLM_ECANCEL:
4507  		receive_flags_reply(lkb, ms, local);
4508  		revert_lock_pc(r, lkb);
4509  		queue_cast(r, lkb, -DLM_ECANCEL);
4510  		break;
4511  	case 0:
4512  		break;
4513  	default:
4514  		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4515  			  lkb->lkb_id,
4516  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4517  	}
4518   out:
4519  	unlock_rsb(r);
4520  	put_rsb(r);
4521  }
4522  
4523  static int receive_cancel_reply(struct dlm_ls *ls,
4524  				const struct dlm_message *ms)
4525  {
4526  	struct dlm_lkb *lkb;
4527  	int error;
4528  
4529  	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4530  	if (error)
4531  		return error;
4532  
4533  	_receive_cancel_reply(lkb, ms, false);
4534  	dlm_put_lkb(lkb);
4535  	return 0;
4536  }
4537  
4538  static void receive_lookup_reply(struct dlm_ls *ls,
4539  				 const struct dlm_message *ms)
4540  {
4541  	struct dlm_lkb *lkb;
4542  	struct dlm_rsb *r;
4543  	int error, ret_nodeid;
4544  	int do_lookup_list = 0;
4545  
4546  	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4547  	if (error) {
4548  		log_error(ls, "%s no lkid %x", __func__,
4549  			  le32_to_cpu(ms->m_lkid));
4550  		return;
4551  	}
4552  
4553  	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4554  	   FIXME: will a non-zero error ever be returned? */
4555  
4556  	r = lkb->lkb_resource;
4557  	hold_rsb(r);
4558  	lock_rsb(r);
4559  
4560  	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4561  	if (error)
4562  		goto out;
4563  
4564  	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4565  
4566  	/* We sometimes receive a request from the dir node for this
4567  	   rsb before we've received the dir node's loookup_reply for it.
4568  	   The request from the dir node implies we're the master, so we set
4569  	   ourself as master in receive_request_reply, and verify here that
4570  	   we are indeed the master. */
4571  
4572  	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4573  		/* This should never happen */
4574  		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4575  			  "master %d dir %d our %d first %x %s",
4576  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4577  			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4578  			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4579  	}
4580  
4581  	if (ret_nodeid == dlm_our_nodeid()) {
4582  		r->res_master_nodeid = ret_nodeid;
4583  		r->res_nodeid = 0;
4584  		do_lookup_list = 1;
4585  		r->res_first_lkid = 0;
4586  	} else if (ret_nodeid == -1) {
4587  		/* the remote node doesn't believe it's the dir node */
4588  		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4589  			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4590  		r->res_master_nodeid = 0;
4591  		r->res_nodeid = -1;
4592  		lkb->lkb_nodeid = -1;
4593  	} else {
4594  		/* set_master() will set lkb_nodeid from r */
4595  		r->res_master_nodeid = ret_nodeid;
4596  		r->res_nodeid = ret_nodeid;
4597  	}
4598  
4599  	if (is_overlap(lkb)) {
4600  		log_debug(ls, "receive_lookup_reply %x unlock %x",
4601  			  lkb->lkb_id, dlm_iflags_val(lkb));
4602  		queue_cast_overlap(r, lkb);
4603  		unhold_lkb(lkb); /* undoes create_lkb() */
4604  		goto out_list;
4605  	}
4606  
4607  	_request_lock(r, lkb);
4608  
4609   out_list:
4610  	if (do_lookup_list)
4611  		process_lookup_list(r);
4612   out:
4613  	unlock_rsb(r);
4614  	put_rsb(r);
4615  	dlm_put_lkb(lkb);
4616  }
4617  
4618  static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4619  			     uint32_t saved_seq)
4620  {
4621  	int error = 0, noent = 0;
4622  
4623  	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4624  		log_limit(ls, "receive %d from non-member %d %x %x %d",
4625  			  le32_to_cpu(ms->m_type),
4626  			  le32_to_cpu(ms->m_header.h_nodeid),
4627  			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4628  			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4629  		return;
4630  	}
4631  
4632  	switch (ms->m_type) {
4633  
4634  	/* messages sent to a master node */
4635  
4636  	case cpu_to_le32(DLM_MSG_REQUEST):
4637  		error = receive_request(ls, ms);
4638  		break;
4639  
4640  	case cpu_to_le32(DLM_MSG_CONVERT):
4641  		error = receive_convert(ls, ms);
4642  		break;
4643  
4644  	case cpu_to_le32(DLM_MSG_UNLOCK):
4645  		error = receive_unlock(ls, ms);
4646  		break;
4647  
4648  	case cpu_to_le32(DLM_MSG_CANCEL):
4649  		noent = 1;
4650  		error = receive_cancel(ls, ms);
4651  		break;
4652  
4653  	/* messages sent from a master node (replies to above) */
4654  
4655  	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4656  		error = receive_request_reply(ls, ms);
4657  		break;
4658  
4659  	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4660  		error = receive_convert_reply(ls, ms);
4661  		break;
4662  
4663  	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4664  		error = receive_unlock_reply(ls, ms);
4665  		break;
4666  
4667  	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4668  		error = receive_cancel_reply(ls, ms);
4669  		break;
4670  
4671  	/* messages sent from a master node (only two types of async msg) */
4672  
4673  	case cpu_to_le32(DLM_MSG_GRANT):
4674  		noent = 1;
4675  		error = receive_grant(ls, ms);
4676  		break;
4677  
4678  	case cpu_to_le32(DLM_MSG_BAST):
4679  		noent = 1;
4680  		error = receive_bast(ls, ms);
4681  		break;
4682  
4683  	/* messages sent to a dir node */
4684  
4685  	case cpu_to_le32(DLM_MSG_LOOKUP):
4686  		receive_lookup(ls, ms);
4687  		break;
4688  
4689  	case cpu_to_le32(DLM_MSG_REMOVE):
4690  		receive_remove(ls, ms);
4691  		break;
4692  
4693  	/* messages sent from a dir node (remove has no reply) */
4694  
4695  	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4696  		receive_lookup_reply(ls, ms);
4697  		break;
4698  
4699  	/* other messages */
4700  
4701  	case cpu_to_le32(DLM_MSG_PURGE):
4702  		receive_purge(ls, ms);
4703  		break;
4704  
4705  	default:
4706  		log_error(ls, "unknown message type %d",
4707  			  le32_to_cpu(ms->m_type));
4708  	}
4709  
4710  	/*
4711  	 * When checking for ENOENT, we're checking the result of
4712  	 * find_lkb(m_remid):
4713  	 *
4714  	 * The lock id referenced in the message wasn't found.  This may
4715  	 * happen in normal usage for the async messages and cancel, so
4716  	 * only use log_debug for them.
4717  	 *
4718  	 * Some errors are expected and normal.
4719  	 */
4720  
4721  	if (error == -ENOENT && noent) {
4722  		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4723  			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4724  			  le32_to_cpu(ms->m_header.h_nodeid),
4725  			  le32_to_cpu(ms->m_lkid), saved_seq);
4726  	} else if (error == -ENOENT) {
4727  		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4728  			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4729  			  le32_to_cpu(ms->m_header.h_nodeid),
4730  			  le32_to_cpu(ms->m_lkid), saved_seq);
4731  
4732  		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4733  			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4734  	}
4735  
4736  	if (error == -EINVAL) {
4737  		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4738  			  "saved_seq %u",
4739  			  le32_to_cpu(ms->m_type),
4740  			  le32_to_cpu(ms->m_header.h_nodeid),
4741  			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4742  			  saved_seq);
4743  	}
4744  }
4745  
4746  /* If the lockspace is in recovery mode (locking stopped), then normal
4747     messages are saved on the requestqueue for processing after recovery is
4748     done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4749     messages off the requestqueue before we process new ones. This occurs right
4750     after recovery completes when we transition from saving all messages on
4751     requestqueue, to processing all the saved messages, to processing new
4752     messages as they arrive. */
4753  
4754  static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4755  				int nodeid)
4756  {
4757  	if (dlm_locking_stopped(ls)) {
4758  		/* If we were a member of this lockspace, left, and rejoined,
4759  		   other nodes may still be sending us messages from the
4760  		   lockspace generation before we left. */
4761  		if (WARN_ON_ONCE(!ls->ls_generation)) {
4762  			log_limit(ls, "receive %d from %d ignore old gen",
4763  				  le32_to_cpu(ms->m_type), nodeid);
4764  			return;
4765  		}
4766  
4767  		dlm_add_requestqueue(ls, nodeid, ms);
4768  	} else {
4769  		dlm_wait_requestqueue(ls);
4770  		_receive_message(ls, ms, 0);
4771  	}
4772  }
4773  
4774  /* This is called by dlm_recoverd to process messages that were saved on
4775     the requestqueue. */
4776  
4777  void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4778  			       uint32_t saved_seq)
4779  {
4780  	_receive_message(ls, ms, saved_seq);
4781  }
4782  
4783  /* This is called by the midcomms layer when something is received for
4784     the lockspace.  It could be either a MSG (normal message sent as part of
4785     standard locking activity) or an RCOM (recovery message sent as part of
4786     lockspace recovery). */
4787  
4788  void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4789  {
4790  	const struct dlm_header *hd = &p->header;
4791  	struct dlm_ls *ls;
4792  	int type = 0;
4793  
4794  	switch (hd->h_cmd) {
4795  	case DLM_MSG:
4796  		type = le32_to_cpu(p->message.m_type);
4797  		break;
4798  	case DLM_RCOM:
4799  		type = le32_to_cpu(p->rcom.rc_type);
4800  		break;
4801  	default:
4802  		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4803  		return;
4804  	}
4805  
4806  	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4807  		log_print("invalid h_nodeid %d from %d lockspace %x",
4808  			  le32_to_cpu(hd->h_nodeid), nodeid,
4809  			  le32_to_cpu(hd->u.h_lockspace));
4810  		return;
4811  	}
4812  
4813  	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4814  	if (!ls) {
4815  		if (dlm_config.ci_log_debug) {
4816  			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4817  				"%u from %d cmd %d type %d\n",
4818  				le32_to_cpu(hd->u.h_lockspace), nodeid,
4819  				hd->h_cmd, type);
4820  		}
4821  
4822  		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4823  			dlm_send_ls_not_ready(nodeid, &p->rcom);
4824  		return;
4825  	}
4826  
4827  	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4828  	   be inactive (in this ls) before transitioning to recovery mode */
4829  
4830  	down_read(&ls->ls_recv_active);
4831  	if (hd->h_cmd == DLM_MSG)
4832  		dlm_receive_message(ls, &p->message, nodeid);
4833  	else if (hd->h_cmd == DLM_RCOM)
4834  		dlm_receive_rcom(ls, &p->rcom, nodeid);
4835  	else
4836  		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4837  			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4838  	up_read(&ls->ls_recv_active);
4839  
4840  	dlm_put_lockspace(ls);
4841  }
4842  
4843  static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4844  				   struct dlm_message *ms_local)
4845  {
4846  	if (middle_conversion(lkb)) {
4847  		hold_lkb(lkb);
4848  		memset(ms_local, 0, sizeof(struct dlm_message));
4849  		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
4850  		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
4851  		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4852  		_receive_convert_reply(lkb, ms_local, true);
4853  
4854  		/* Same special case as in receive_rcom_lock_args() */
4855  		lkb->lkb_grmode = DLM_LOCK_IV;
4856  		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4857  		unhold_lkb(lkb);
4858  
4859  	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4860  		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4861  	}
4862  
4863  	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4864  	   conversions are async; there's no reply from the remote master */
4865  }
4866  
4867  /* A waiting lkb needs recovery if the master node has failed, or
4868     the master node is changing (only when no directory is used) */
4869  
4870  static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
4871  				 int dir_nodeid)
4872  {
4873  	if (dlm_no_directory(ls))
4874  		return 1;
4875  
4876  	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
4877  		return 1;
4878  
4879  	return 0;
4880  }
4881  
4882  /* Recovery for locks that are waiting for replies from nodes that are now
4883     gone.  We can just complete unlocks and cancels by faking a reply from the
4884     dead node.  Requests and up-conversions we flag to be resent after
4885     recovery.  Down-conversions can just be completed with a fake reply like
4886     unlocks.  Conversions between PR and CW need special attention. */
4887  
4888  void dlm_recover_waiters_pre(struct dlm_ls *ls)
4889  {
4890  	struct dlm_lkb *lkb, *safe;
4891  	struct dlm_message *ms_local;
4892  	int wait_type, local_unlock_result, local_cancel_result;
4893  	int dir_nodeid;
4894  
4895  	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
4896  	if (!ms_local)
4897  		return;
4898  
4899  	mutex_lock(&ls->ls_waiters_mutex);
4900  
4901  	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4902  
4903  		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
4904  
4905  		/* exclude debug messages about unlocks because there can be so
4906  		   many and they aren't very interesting */
4907  
4908  		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4909  			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
4910  				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
4911  				  lkb->lkb_id,
4912  				  lkb->lkb_remid,
4913  				  lkb->lkb_wait_type,
4914  				  lkb->lkb_resource->res_nodeid,
4915  				  lkb->lkb_nodeid,
4916  				  lkb->lkb_wait_nodeid,
4917  				  dir_nodeid);
4918  		}
4919  
4920  		/* all outstanding lookups, regardless of destination  will be
4921  		   resent after recovery is done */
4922  
4923  		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4924  			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4925  			continue;
4926  		}
4927  
4928  		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
4929  			continue;
4930  
4931  		wait_type = lkb->lkb_wait_type;
4932  		local_unlock_result = -DLM_EUNLOCK;
4933  		local_cancel_result = -DLM_ECANCEL;
4934  
4935  		/* Main reply may have been received leaving a zero wait_type,
4936  		   but a reply for the overlapping op may not have been
4937  		   received.  In that case we need to fake the appropriate
4938  		   reply for the overlap op. */
4939  
4940  		if (!wait_type) {
4941  			if (is_overlap_cancel(lkb)) {
4942  				wait_type = DLM_MSG_CANCEL;
4943  				if (lkb->lkb_grmode == DLM_LOCK_IV)
4944  					local_cancel_result = 0;
4945  			}
4946  			if (is_overlap_unlock(lkb)) {
4947  				wait_type = DLM_MSG_UNLOCK;
4948  				if (lkb->lkb_grmode == DLM_LOCK_IV)
4949  					local_unlock_result = -ENOENT;
4950  			}
4951  
4952  			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4953  				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
4954  				  local_cancel_result, local_unlock_result);
4955  		}
4956  
4957  		switch (wait_type) {
4958  
4959  		case DLM_MSG_REQUEST:
4960  			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
4961  			break;
4962  
4963  		case DLM_MSG_CONVERT:
4964  			recover_convert_waiter(ls, lkb, ms_local);
4965  			break;
4966  
4967  		case DLM_MSG_UNLOCK:
4968  			hold_lkb(lkb);
4969  			memset(ms_local, 0, sizeof(struct dlm_message));
4970  			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
4971  			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
4972  			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4973  			_receive_unlock_reply(lkb, ms_local, true);
4974  			dlm_put_lkb(lkb);
4975  			break;
4976  
4977  		case DLM_MSG_CANCEL:
4978  			hold_lkb(lkb);
4979  			memset(ms_local, 0, sizeof(struct dlm_message));
4980  			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
4981  			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
4982  			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
4983  			_receive_cancel_reply(lkb, ms_local, true);
4984  			dlm_put_lkb(lkb);
4985  			break;
4986  
4987  		default:
4988  			log_error(ls, "invalid lkb wait_type %d %d",
4989  				  lkb->lkb_wait_type, wait_type);
4990  		}
4991  		schedule();
4992  	}
4993  	mutex_unlock(&ls->ls_waiters_mutex);
4994  	kfree(ms_local);
4995  }
4996  
4997  static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4998  {
4999  	struct dlm_lkb *lkb = NULL, *iter;
5000  
5001  	mutex_lock(&ls->ls_waiters_mutex);
5002  	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5003  		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5004  			hold_lkb(iter);
5005  			lkb = iter;
5006  			break;
5007  		}
5008  	}
5009  	mutex_unlock(&ls->ls_waiters_mutex);
5010  
5011  	return lkb;
5012  }
5013  
5014  /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
5015     master or dir-node for r.  Processing the lkb may result in it being placed
5016     back on waiters. */
5017  
5018  /* We do this after normal locking has been enabled and any saved messages
5019     (in requestqueue) have been processed.  We should be confident that at
5020     this point we won't get or process a reply to any of these waiting
5021     operations.  But, new ops may be coming in on the rsbs/locks here from
5022     userspace or remotely. */
5023  
5024  /* there may have been an overlap unlock/cancel prior to recovery or after
5025     recovery.  if before, the lkb may still have a pos wait_count; if after, the
5026     overlap flag would just have been set and nothing new sent.  we can be
5027     confident here than any replies to either the initial op or overlap ops
5028     prior to recovery have been received. */
5029  
5030  int dlm_recover_waiters_post(struct dlm_ls *ls)
5031  {
5032  	struct dlm_lkb *lkb;
5033  	struct dlm_rsb *r;
5034  	int error = 0, mstype, err, oc, ou;
5035  
5036  	while (1) {
5037  		if (dlm_locking_stopped(ls)) {
5038  			log_debug(ls, "recover_waiters_post aborted");
5039  			error = -EINTR;
5040  			break;
5041  		}
5042  
5043  		lkb = find_resend_waiter(ls);
5044  		if (!lkb)
5045  			break;
5046  
5047  		r = lkb->lkb_resource;
5048  		hold_rsb(r);
5049  		lock_rsb(r);
5050  
5051  		mstype = lkb->lkb_wait_type;
5052  		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5053  					&lkb->lkb_iflags);
5054  		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5055  					&lkb->lkb_iflags);
5056  		err = 0;
5057  
5058  		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5059  			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5060  			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5061  			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5062  			  dlm_dir_nodeid(r), oc, ou);
5063  
5064  		/* At this point we assume that we won't get a reply to any
5065  		   previous op or overlap op on this lock.  First, do a big
5066  		   remove_from_waiters() for all previous ops. */
5067  
5068  		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5069  		lkb->lkb_wait_type = 0;
5070  		/* drop all wait_count references we still
5071  		 * hold a reference for this iteration.
5072  		 */
5073  		while (!atomic_dec_and_test(&lkb->lkb_wait_count))
5074  			unhold_lkb(lkb);
5075  
5076  		mutex_lock(&ls->ls_waiters_mutex);
5077  		list_del_init(&lkb->lkb_wait_reply);
5078  		mutex_unlock(&ls->ls_waiters_mutex);
5079  
5080  		if (oc || ou) {
5081  			/* do an unlock or cancel instead of resending */
5082  			switch (mstype) {
5083  			case DLM_MSG_LOOKUP:
5084  			case DLM_MSG_REQUEST:
5085  				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5086  							-DLM_ECANCEL);
5087  				unhold_lkb(lkb); /* undoes create_lkb() */
5088  				break;
5089  			case DLM_MSG_CONVERT:
5090  				if (oc) {
5091  					queue_cast(r, lkb, -DLM_ECANCEL);
5092  				} else {
5093  					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5094  					_unlock_lock(r, lkb);
5095  				}
5096  				break;
5097  			default:
5098  				err = 1;
5099  			}
5100  		} else {
5101  			switch (mstype) {
5102  			case DLM_MSG_LOOKUP:
5103  			case DLM_MSG_REQUEST:
5104  				_request_lock(r, lkb);
5105  				if (is_master(r))
5106  					confirm_master(r, 0);
5107  				break;
5108  			case DLM_MSG_CONVERT:
5109  				_convert_lock(r, lkb);
5110  				break;
5111  			default:
5112  				err = 1;
5113  			}
5114  		}
5115  
5116  		if (err) {
5117  			log_error(ls, "waiter %x msg %d r_nodeid %d "
5118  				  "dir_nodeid %d overlap %d %d",
5119  				  lkb->lkb_id, mstype, r->res_nodeid,
5120  				  dlm_dir_nodeid(r), oc, ou);
5121  		}
5122  		unlock_rsb(r);
5123  		put_rsb(r);
5124  		dlm_put_lkb(lkb);
5125  	}
5126  
5127  	return error;
5128  }
5129  
5130  static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5131  			      struct list_head *list)
5132  {
5133  	struct dlm_lkb *lkb, *safe;
5134  
5135  	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5136  		if (!is_master_copy(lkb))
5137  			continue;
5138  
5139  		/* don't purge lkbs we've added in recover_master_copy for
5140  		   the current recovery seq */
5141  
5142  		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5143  			continue;
5144  
5145  		del_lkb(r, lkb);
5146  
5147  		/* this put should free the lkb */
5148  		if (!dlm_put_lkb(lkb))
5149  			log_error(ls, "purged mstcpy lkb not released");
5150  	}
5151  }
5152  
5153  void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5154  {
5155  	struct dlm_ls *ls = r->res_ls;
5156  
5157  	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5158  	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5159  	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5160  }
5161  
5162  static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5163  			    struct list_head *list,
5164  			    int nodeid_gone, unsigned int *count)
5165  {
5166  	struct dlm_lkb *lkb, *safe;
5167  
5168  	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5169  		if (!is_master_copy(lkb))
5170  			continue;
5171  
5172  		if ((lkb->lkb_nodeid == nodeid_gone) ||
5173  		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5174  
5175  			/* tell recover_lvb to invalidate the lvb
5176  			   because a node holding EX/PW failed */
5177  			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5178  			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5179  				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5180  			}
5181  
5182  			del_lkb(r, lkb);
5183  
5184  			/* this put should free the lkb */
5185  			if (!dlm_put_lkb(lkb))
5186  				log_error(ls, "purged dead lkb not released");
5187  
5188  			rsb_set_flag(r, RSB_RECOVER_GRANT);
5189  
5190  			(*count)++;
5191  		}
5192  	}
5193  }
5194  
5195  /* Get rid of locks held by nodes that are gone. */
5196  
5197  void dlm_recover_purge(struct dlm_ls *ls)
5198  {
5199  	struct dlm_rsb *r;
5200  	struct dlm_member *memb;
5201  	int nodes_count = 0;
5202  	int nodeid_gone = 0;
5203  	unsigned int lkb_count = 0;
5204  
5205  	/* cache one removed nodeid to optimize the common
5206  	   case of a single node removed */
5207  
5208  	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5209  		nodes_count++;
5210  		nodeid_gone = memb->nodeid;
5211  	}
5212  
5213  	if (!nodes_count)
5214  		return;
5215  
5216  	down_write(&ls->ls_root_sem);
5217  	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
5218  		hold_rsb(r);
5219  		lock_rsb(r);
5220  		if (is_master(r)) {
5221  			purge_dead_list(ls, r, &r->res_grantqueue,
5222  					nodeid_gone, &lkb_count);
5223  			purge_dead_list(ls, r, &r->res_convertqueue,
5224  					nodeid_gone, &lkb_count);
5225  			purge_dead_list(ls, r, &r->res_waitqueue,
5226  					nodeid_gone, &lkb_count);
5227  		}
5228  		unlock_rsb(r);
5229  		unhold_rsb(r);
5230  		cond_resched();
5231  	}
5232  	up_write(&ls->ls_root_sem);
5233  
5234  	if (lkb_count)
5235  		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5236  			  lkb_count, nodes_count);
5237  }
5238  
5239  static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
5240  {
5241  	struct rb_node *n;
5242  	struct dlm_rsb *r;
5243  
5244  	spin_lock(&ls->ls_rsbtbl[bucket].lock);
5245  	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
5246  		r = rb_entry(n, struct dlm_rsb, res_hashnode);
5247  
5248  		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5249  			continue;
5250  		if (!is_master(r)) {
5251  			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5252  			continue;
5253  		}
5254  		hold_rsb(r);
5255  		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5256  		return r;
5257  	}
5258  	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
5259  	return NULL;
5260  }
5261  
5262  /*
5263   * Attempt to grant locks on resources that we are the master of.
5264   * Locks may have become grantable during recovery because locks
5265   * from departed nodes have been purged (or not rebuilt), allowing
5266   * previously blocked locks to now be granted.  The subset of rsb's
5267   * we are interested in are those with lkb's on either the convert or
5268   * waiting queues.
5269   *
5270   * Simplest would be to go through each master rsb and check for non-empty
5271   * convert or waiting queues, and attempt to grant on those rsbs.
5272   * Checking the queues requires lock_rsb, though, for which we'd need
5273   * to release the rsbtbl lock.  This would make iterating through all
5274   * rsb's very inefficient.  So, we rely on earlier recovery routines
5275   * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5276   * locks for.
5277   */
5278  
5279  void dlm_recover_grant(struct dlm_ls *ls)
5280  {
5281  	struct dlm_rsb *r;
5282  	int bucket = 0;
5283  	unsigned int count = 0;
5284  	unsigned int rsb_count = 0;
5285  	unsigned int lkb_count = 0;
5286  
5287  	while (1) {
5288  		r = find_grant_rsb(ls, bucket);
5289  		if (!r) {
5290  			if (bucket == ls->ls_rsbtbl_size - 1)
5291  				break;
5292  			bucket++;
5293  			continue;
5294  		}
5295  		rsb_count++;
5296  		count = 0;
5297  		lock_rsb(r);
5298  		/* the RECOVER_GRANT flag is checked in the grant path */
5299  		grant_pending_locks(r, &count);
5300  		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5301  		lkb_count += count;
5302  		confirm_master(r, 0);
5303  		unlock_rsb(r);
5304  		put_rsb(r);
5305  		cond_resched();
5306  	}
5307  
5308  	if (lkb_count)
5309  		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5310  			  lkb_count, rsb_count);
5311  }
5312  
5313  static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5314  					 uint32_t remid)
5315  {
5316  	struct dlm_lkb *lkb;
5317  
5318  	list_for_each_entry(lkb, head, lkb_statequeue) {
5319  		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5320  			return lkb;
5321  	}
5322  	return NULL;
5323  }
5324  
5325  static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5326  				    uint32_t remid)
5327  {
5328  	struct dlm_lkb *lkb;
5329  
5330  	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5331  	if (lkb)
5332  		return lkb;
5333  	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5334  	if (lkb)
5335  		return lkb;
5336  	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5337  	if (lkb)
5338  		return lkb;
5339  	return NULL;
5340  }
5341  
5342  /* needs at least dlm_rcom + rcom_lock */
5343  static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5344  				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5345  {
5346  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5347  
5348  	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5349  	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5350  	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5351  	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5352  	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5353  	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5354  	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5355  	lkb->lkb_rqmode = rl->rl_rqmode;
5356  	lkb->lkb_grmode = rl->rl_grmode;
5357  	/* don't set lkb_status because add_lkb wants to itself */
5358  
5359  	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5360  	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5361  
5362  	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5363  		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5364  			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5365  		if (lvblen > ls->ls_lvblen)
5366  			return -EINVAL;
5367  		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5368  		if (!lkb->lkb_lvbptr)
5369  			return -ENOMEM;
5370  		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5371  	}
5372  
5373  	/* Conversions between PR and CW (middle modes) need special handling.
5374  	   The real granted mode of these converting locks cannot be determined
5375  	   until all locks have been rebuilt on the rsb (recover_conversion) */
5376  
5377  	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
5378  	    middle_conversion(lkb)) {
5379  		rl->rl_status = DLM_LKSTS_CONVERT;
5380  		lkb->lkb_grmode = DLM_LOCK_IV;
5381  		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5382  	}
5383  
5384  	return 0;
5385  }
5386  
5387  /* This lkb may have been recovered in a previous aborted recovery so we need
5388     to check if the rsb already has an lkb with the given remote nodeid/lkid.
5389     If so we just send back a standard reply.  If not, we create a new lkb with
5390     the given values and send back our lkid.  We send back our lkid by sending
5391     back the rcom_lock struct we got but with the remid field filled in. */
5392  
5393  /* needs at least dlm_rcom + rcom_lock */
5394  int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5395  			    __le32 *rl_remid, __le32 *rl_result)
5396  {
5397  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5398  	struct dlm_rsb *r;
5399  	struct dlm_lkb *lkb;
5400  	uint32_t remid = 0;
5401  	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5402  	int error;
5403  
5404  	/* init rl_remid with rcom lock rl_remid */
5405  	*rl_remid = rl->rl_remid;
5406  
5407  	if (rl->rl_parent_lkid) {
5408  		error = -EOPNOTSUPP;
5409  		goto out;
5410  	}
5411  
5412  	remid = le32_to_cpu(rl->rl_lkid);
5413  
5414  	/* In general we expect the rsb returned to be R_MASTER, but we don't
5415  	   have to require it.  Recovery of masters on one node can overlap
5416  	   recovery of locks on another node, so one node can send us MSTCPY
5417  	   locks before we've made ourselves master of this rsb.  We can still
5418  	   add new MSTCPY locks that we receive here without any harm; when
5419  	   we make ourselves master, dlm_recover_masters() won't touch the
5420  	   MSTCPY locks we've received early. */
5421  
5422  	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5423  			 from_nodeid, R_RECEIVE_RECOVER, &r);
5424  	if (error)
5425  		goto out;
5426  
5427  	lock_rsb(r);
5428  
5429  	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5430  		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5431  			  from_nodeid, remid);
5432  		error = -EBADR;
5433  		goto out_unlock;
5434  	}
5435  
5436  	lkb = search_remid(r, from_nodeid, remid);
5437  	if (lkb) {
5438  		error = -EEXIST;
5439  		goto out_remid;
5440  	}
5441  
5442  	error = create_lkb(ls, &lkb);
5443  	if (error)
5444  		goto out_unlock;
5445  
5446  	error = receive_rcom_lock_args(ls, lkb, r, rc);
5447  	if (error) {
5448  		__put_lkb(ls, lkb);
5449  		goto out_unlock;
5450  	}
5451  
5452  	attach_lkb(r, lkb);
5453  	add_lkb(r, lkb, rl->rl_status);
5454  	ls->ls_recover_locks_in++;
5455  
5456  	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5457  		rsb_set_flag(r, RSB_RECOVER_GRANT);
5458  
5459   out_remid:
5460  	/* this is the new value returned to the lock holder for
5461  	   saving in its process-copy lkb */
5462  	*rl_remid = cpu_to_le32(lkb->lkb_id);
5463  
5464  	lkb->lkb_recover_seq = ls->ls_recover_seq;
5465  
5466   out_unlock:
5467  	unlock_rsb(r);
5468  	put_rsb(r);
5469   out:
5470  	if (error && error != -EEXIST)
5471  		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5472  			  from_nodeid, remid, error);
5473  	*rl_result = cpu_to_le32(error);
5474  	return error;
5475  }
5476  
5477  /* needs at least dlm_rcom + rcom_lock */
5478  int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5479  			     uint64_t seq)
5480  {
5481  	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5482  	struct dlm_rsb *r;
5483  	struct dlm_lkb *lkb;
5484  	uint32_t lkid, remid;
5485  	int error, result;
5486  
5487  	lkid = le32_to_cpu(rl->rl_lkid);
5488  	remid = le32_to_cpu(rl->rl_remid);
5489  	result = le32_to_cpu(rl->rl_result);
5490  
5491  	error = find_lkb(ls, lkid, &lkb);
5492  	if (error) {
5493  		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5494  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5495  			  result);
5496  		return error;
5497  	}
5498  
5499  	r = lkb->lkb_resource;
5500  	hold_rsb(r);
5501  	lock_rsb(r);
5502  
5503  	if (!is_process_copy(lkb)) {
5504  		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5505  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5506  			  result);
5507  		dlm_dump_rsb(r);
5508  		unlock_rsb(r);
5509  		put_rsb(r);
5510  		dlm_put_lkb(lkb);
5511  		return -EINVAL;
5512  	}
5513  
5514  	switch (result) {
5515  	case -EBADR:
5516  		/* There's a chance the new master received our lock before
5517  		   dlm_recover_master_reply(), this wouldn't happen if we did
5518  		   a barrier between recover_masters and recover_locks. */
5519  
5520  		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5521  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5522  			  result);
5523  
5524  		dlm_send_rcom_lock(r, lkb, seq);
5525  		goto out;
5526  	case -EEXIST:
5527  	case 0:
5528  		lkb->lkb_remid = remid;
5529  		break;
5530  	default:
5531  		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5532  			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5533  			  result);
5534  	}
5535  
5536  	/* an ack for dlm_recover_locks() which waits for replies from
5537  	   all the locks it sends to new masters */
5538  	dlm_recovered_lock(r);
5539   out:
5540  	unlock_rsb(r);
5541  	put_rsb(r);
5542  	dlm_put_lkb(lkb);
5543  
5544  	return 0;
5545  }
5546  
5547  int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5548  		     int mode, uint32_t flags, void *name, unsigned int namelen)
5549  {
5550  	struct dlm_lkb *lkb;
5551  	struct dlm_args args;
5552  	bool do_put = true;
5553  	int error;
5554  
5555  	dlm_lock_recovery(ls);
5556  
5557  	error = create_lkb(ls, &lkb);
5558  	if (error) {
5559  		kfree(ua);
5560  		goto out;
5561  	}
5562  
5563  	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5564  
5565  	if (flags & DLM_LKF_VALBLK) {
5566  		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5567  		if (!ua->lksb.sb_lvbptr) {
5568  			kfree(ua);
5569  			error = -ENOMEM;
5570  			goto out_put;
5571  		}
5572  	}
5573  	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5574  			      fake_bastfn, &args);
5575  	if (error) {
5576  		kfree(ua->lksb.sb_lvbptr);
5577  		ua->lksb.sb_lvbptr = NULL;
5578  		kfree(ua);
5579  		goto out_put;
5580  	}
5581  
5582  	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5583  	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5584  	   lock and that lkb_astparam is the dlm_user_args structure. */
5585  	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5586  	error = request_lock(ls, lkb, name, namelen, &args);
5587  
5588  	switch (error) {
5589  	case 0:
5590  		break;
5591  	case -EINPROGRESS:
5592  		error = 0;
5593  		break;
5594  	case -EAGAIN:
5595  		error = 0;
5596  		fallthrough;
5597  	default:
5598  		goto out_put;
5599  	}
5600  
5601  	/* add this new lkb to the per-process list of locks */
5602  	spin_lock(&ua->proc->locks_spin);
5603  	hold_lkb(lkb);
5604  	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5605  	spin_unlock(&ua->proc->locks_spin);
5606  	do_put = false;
5607   out_put:
5608  	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5609  	if (do_put)
5610  		__put_lkb(ls, lkb);
5611   out:
5612  	dlm_unlock_recovery(ls);
5613  	return error;
5614  }
5615  
5616  int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5617  		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5618  {
5619  	struct dlm_lkb *lkb;
5620  	struct dlm_args args;
5621  	struct dlm_user_args *ua;
5622  	int error;
5623  
5624  	dlm_lock_recovery(ls);
5625  
5626  	error = find_lkb(ls, lkid, &lkb);
5627  	if (error)
5628  		goto out;
5629  
5630  	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5631  
5632  	/* user can change the params on its lock when it converts it, or
5633  	   add an lvb that didn't exist before */
5634  
5635  	ua = lkb->lkb_ua;
5636  
5637  	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5638  		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5639  		if (!ua->lksb.sb_lvbptr) {
5640  			error = -ENOMEM;
5641  			goto out_put;
5642  		}
5643  	}
5644  	if (lvb_in && ua->lksb.sb_lvbptr)
5645  		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5646  
5647  	ua->xid = ua_tmp->xid;
5648  	ua->castparam = ua_tmp->castparam;
5649  	ua->castaddr = ua_tmp->castaddr;
5650  	ua->bastparam = ua_tmp->bastparam;
5651  	ua->bastaddr = ua_tmp->bastaddr;
5652  	ua->user_lksb = ua_tmp->user_lksb;
5653  
5654  	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5655  			      fake_bastfn, &args);
5656  	if (error)
5657  		goto out_put;
5658  
5659  	error = convert_lock(ls, lkb, &args);
5660  
5661  	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5662  		error = 0;
5663   out_put:
5664  	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5665  	dlm_put_lkb(lkb);
5666   out:
5667  	dlm_unlock_recovery(ls);
5668  	kfree(ua_tmp);
5669  	return error;
5670  }
5671  
5672  /*
5673   * The caller asks for an orphan lock on a given resource with a given mode.
5674   * If a matching lock exists, it's moved to the owner's list of locks and
5675   * the lkid is returned.
5676   */
5677  
5678  int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5679  		     int mode, uint32_t flags, void *name, unsigned int namelen,
5680  		     uint32_t *lkid)
5681  {
5682  	struct dlm_lkb *lkb = NULL, *iter;
5683  	struct dlm_user_args *ua;
5684  	int found_other_mode = 0;
5685  	int rv = 0;
5686  
5687  	mutex_lock(&ls->ls_orphans_mutex);
5688  	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5689  		if (iter->lkb_resource->res_length != namelen)
5690  			continue;
5691  		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5692  			continue;
5693  		if (iter->lkb_grmode != mode) {
5694  			found_other_mode = 1;
5695  			continue;
5696  		}
5697  
5698  		lkb = iter;
5699  		list_del_init(&iter->lkb_ownqueue);
5700  		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5701  		*lkid = iter->lkb_id;
5702  		break;
5703  	}
5704  	mutex_unlock(&ls->ls_orphans_mutex);
5705  
5706  	if (!lkb && found_other_mode) {
5707  		rv = -EAGAIN;
5708  		goto out;
5709  	}
5710  
5711  	if (!lkb) {
5712  		rv = -ENOENT;
5713  		goto out;
5714  	}
5715  
5716  	lkb->lkb_exflags = flags;
5717  	lkb->lkb_ownpid = (int) current->pid;
5718  
5719  	ua = lkb->lkb_ua;
5720  
5721  	ua->proc = ua_tmp->proc;
5722  	ua->xid = ua_tmp->xid;
5723  	ua->castparam = ua_tmp->castparam;
5724  	ua->castaddr = ua_tmp->castaddr;
5725  	ua->bastparam = ua_tmp->bastparam;
5726  	ua->bastaddr = ua_tmp->bastaddr;
5727  	ua->user_lksb = ua_tmp->user_lksb;
5728  
5729  	/*
5730  	 * The lkb reference from the ls_orphans list was not
5731  	 * removed above, and is now considered the reference
5732  	 * for the proc locks list.
5733  	 */
5734  
5735  	spin_lock(&ua->proc->locks_spin);
5736  	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5737  	spin_unlock(&ua->proc->locks_spin);
5738   out:
5739  	kfree(ua_tmp);
5740  	return rv;
5741  }
5742  
5743  int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5744  		    uint32_t flags, uint32_t lkid, char *lvb_in)
5745  {
5746  	struct dlm_lkb *lkb;
5747  	struct dlm_args args;
5748  	struct dlm_user_args *ua;
5749  	int error;
5750  
5751  	dlm_lock_recovery(ls);
5752  
5753  	error = find_lkb(ls, lkid, &lkb);
5754  	if (error)
5755  		goto out;
5756  
5757  	trace_dlm_unlock_start(ls, lkb, flags);
5758  
5759  	ua = lkb->lkb_ua;
5760  
5761  	if (lvb_in && ua->lksb.sb_lvbptr)
5762  		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5763  	if (ua_tmp->castparam)
5764  		ua->castparam = ua_tmp->castparam;
5765  	ua->user_lksb = ua_tmp->user_lksb;
5766  
5767  	error = set_unlock_args(flags, ua, &args);
5768  	if (error)
5769  		goto out_put;
5770  
5771  	error = unlock_lock(ls, lkb, &args);
5772  
5773  	if (error == -DLM_EUNLOCK)
5774  		error = 0;
5775  	/* from validate_unlock_args() */
5776  	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5777  		error = 0;
5778  	if (error)
5779  		goto out_put;
5780  
5781  	spin_lock(&ua->proc->locks_spin);
5782  	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5783  	if (!list_empty(&lkb->lkb_ownqueue))
5784  		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5785  	spin_unlock(&ua->proc->locks_spin);
5786   out_put:
5787  	trace_dlm_unlock_end(ls, lkb, flags, error);
5788  	dlm_put_lkb(lkb);
5789   out:
5790  	dlm_unlock_recovery(ls);
5791  	kfree(ua_tmp);
5792  	return error;
5793  }
5794  
5795  int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5796  		    uint32_t flags, uint32_t lkid)
5797  {
5798  	struct dlm_lkb *lkb;
5799  	struct dlm_args args;
5800  	struct dlm_user_args *ua;
5801  	int error;
5802  
5803  	dlm_lock_recovery(ls);
5804  
5805  	error = find_lkb(ls, lkid, &lkb);
5806  	if (error)
5807  		goto out;
5808  
5809  	trace_dlm_unlock_start(ls, lkb, flags);
5810  
5811  	ua = lkb->lkb_ua;
5812  	if (ua_tmp->castparam)
5813  		ua->castparam = ua_tmp->castparam;
5814  	ua->user_lksb = ua_tmp->user_lksb;
5815  
5816  	error = set_unlock_args(flags, ua, &args);
5817  	if (error)
5818  		goto out_put;
5819  
5820  	error = cancel_lock(ls, lkb, &args);
5821  
5822  	if (error == -DLM_ECANCEL)
5823  		error = 0;
5824  	/* from validate_unlock_args() */
5825  	if (error == -EBUSY)
5826  		error = 0;
5827   out_put:
5828  	trace_dlm_unlock_end(ls, lkb, flags, error);
5829  	dlm_put_lkb(lkb);
5830   out:
5831  	dlm_unlock_recovery(ls);
5832  	kfree(ua_tmp);
5833  	return error;
5834  }
5835  
5836  int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
5837  {
5838  	struct dlm_lkb *lkb;
5839  	struct dlm_args args;
5840  	struct dlm_user_args *ua;
5841  	struct dlm_rsb *r;
5842  	int error;
5843  
5844  	dlm_lock_recovery(ls);
5845  
5846  	error = find_lkb(ls, lkid, &lkb);
5847  	if (error)
5848  		goto out;
5849  
5850  	trace_dlm_unlock_start(ls, lkb, flags);
5851  
5852  	ua = lkb->lkb_ua;
5853  
5854  	error = set_unlock_args(flags, ua, &args);
5855  	if (error)
5856  		goto out_put;
5857  
5858  	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
5859  
5860  	r = lkb->lkb_resource;
5861  	hold_rsb(r);
5862  	lock_rsb(r);
5863  
5864  	error = validate_unlock_args(lkb, &args);
5865  	if (error)
5866  		goto out_r;
5867  	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
5868  
5869  	error = _cancel_lock(r, lkb);
5870   out_r:
5871  	unlock_rsb(r);
5872  	put_rsb(r);
5873  
5874  	if (error == -DLM_ECANCEL)
5875  		error = 0;
5876  	/* from validate_unlock_args() */
5877  	if (error == -EBUSY)
5878  		error = 0;
5879   out_put:
5880  	trace_dlm_unlock_end(ls, lkb, flags, error);
5881  	dlm_put_lkb(lkb);
5882   out:
5883  	dlm_unlock_recovery(ls);
5884  	return error;
5885  }
5886  
5887  /* lkb's that are removed from the waiters list by revert are just left on the
5888     orphans list with the granted orphan locks, to be freed by purge */
5889  
5890  static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5891  {
5892  	struct dlm_args args;
5893  	int error;
5894  
5895  	hold_lkb(lkb); /* reference for the ls_orphans list */
5896  	mutex_lock(&ls->ls_orphans_mutex);
5897  	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
5898  	mutex_unlock(&ls->ls_orphans_mutex);
5899  
5900  	set_unlock_args(0, lkb->lkb_ua, &args);
5901  
5902  	error = cancel_lock(ls, lkb, &args);
5903  	if (error == -DLM_ECANCEL)
5904  		error = 0;
5905  	return error;
5906  }
5907  
5908  /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
5909     granted.  Regardless of what rsb queue the lock is on, it's removed and
5910     freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
5911     if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
5912  
5913  static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
5914  {
5915  	struct dlm_args args;
5916  	int error;
5917  
5918  	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
5919  			lkb->lkb_ua, &args);
5920  
5921  	error = unlock_lock(ls, lkb, &args);
5922  	if (error == -DLM_EUNLOCK)
5923  		error = 0;
5924  	return error;
5925  }
5926  
5927  /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
5928     (which does lock_rsb) due to deadlock with receiving a message that does
5929     lock_rsb followed by dlm_user_add_cb() */
5930  
5931  static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
5932  				     struct dlm_user_proc *proc)
5933  {
5934  	struct dlm_lkb *lkb = NULL;
5935  
5936  	spin_lock(&ls->ls_clear_proc_locks);
5937  	if (list_empty(&proc->locks))
5938  		goto out;
5939  
5940  	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
5941  	list_del_init(&lkb->lkb_ownqueue);
5942  
5943  	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5944  		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
5945  	else
5946  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5947   out:
5948  	spin_unlock(&ls->ls_clear_proc_locks);
5949  	return lkb;
5950  }
5951  
5952  /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
5953     1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
5954     which we clear here. */
5955  
5956  /* proc CLOSING flag is set so no more device_reads should look at proc->asts
5957     list, and no more device_writes should add lkb's to proc->locks list; so we
5958     shouldn't need to take asts_spin or locks_spin here.  this assumes that
5959     device reads/writes/closes are serialized -- FIXME: we may need to serialize
5960     them ourself. */
5961  
5962  void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5963  {
5964  	struct dlm_lkb *lkb, *safe;
5965  
5966  	dlm_lock_recovery(ls);
5967  
5968  	while (1) {
5969  		lkb = del_proc_lock(ls, proc);
5970  		if (!lkb)
5971  			break;
5972  		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5973  			orphan_proc_lock(ls, lkb);
5974  		else
5975  			unlock_proc_lock(ls, lkb);
5976  
5977  		/* this removes the reference for the proc->locks list
5978  		   added by dlm_user_request, it may result in the lkb
5979  		   being freed */
5980  
5981  		dlm_put_lkb(lkb);
5982  	}
5983  
5984  	spin_lock(&ls->ls_clear_proc_locks);
5985  
5986  	/* in-progress unlocks */
5987  	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5988  		list_del_init(&lkb->lkb_ownqueue);
5989  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
5990  		dlm_put_lkb(lkb);
5991  	}
5992  
5993  	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5994  		dlm_purge_lkb_callbacks(lkb);
5995  		list_del_init(&lkb->lkb_cb_list);
5996  		dlm_put_lkb(lkb);
5997  	}
5998  
5999  	spin_unlock(&ls->ls_clear_proc_locks);
6000  	dlm_unlock_recovery(ls);
6001  }
6002  
6003  static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6004  {
6005  	struct dlm_lkb *lkb, *safe;
6006  
6007  	while (1) {
6008  		lkb = NULL;
6009  		spin_lock(&proc->locks_spin);
6010  		if (!list_empty(&proc->locks)) {
6011  			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6012  					 lkb_ownqueue);
6013  			list_del_init(&lkb->lkb_ownqueue);
6014  		}
6015  		spin_unlock(&proc->locks_spin);
6016  
6017  		if (!lkb)
6018  			break;
6019  
6020  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6021  		unlock_proc_lock(ls, lkb);
6022  		dlm_put_lkb(lkb); /* ref from proc->locks list */
6023  	}
6024  
6025  	spin_lock(&proc->locks_spin);
6026  	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6027  		list_del_init(&lkb->lkb_ownqueue);
6028  		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6029  		dlm_put_lkb(lkb);
6030  	}
6031  	spin_unlock(&proc->locks_spin);
6032  
6033  	spin_lock(&proc->asts_spin);
6034  	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
6035  		dlm_purge_lkb_callbacks(lkb);
6036  		list_del_init(&lkb->lkb_cb_list);
6037  		dlm_put_lkb(lkb);
6038  	}
6039  	spin_unlock(&proc->asts_spin);
6040  }
6041  
6042  /* pid of 0 means purge all orphans */
6043  
6044  static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6045  {
6046  	struct dlm_lkb *lkb, *safe;
6047  
6048  	mutex_lock(&ls->ls_orphans_mutex);
6049  	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6050  		if (pid && lkb->lkb_ownpid != pid)
6051  			continue;
6052  		unlock_proc_lock(ls, lkb);
6053  		list_del_init(&lkb->lkb_ownqueue);
6054  		dlm_put_lkb(lkb);
6055  	}
6056  	mutex_unlock(&ls->ls_orphans_mutex);
6057  }
6058  
6059  static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6060  {
6061  	struct dlm_message *ms;
6062  	struct dlm_mhandle *mh;
6063  	int error;
6064  
6065  	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6066  				DLM_MSG_PURGE, &ms, &mh, GFP_NOFS);
6067  	if (error)
6068  		return error;
6069  	ms->m_nodeid = cpu_to_le32(nodeid);
6070  	ms->m_pid = cpu_to_le32(pid);
6071  
6072  	return send_message(mh, ms, NULL, 0);
6073  }
6074  
6075  int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6076  		   int nodeid, int pid)
6077  {
6078  	int error = 0;
6079  
6080  	if (nodeid && (nodeid != dlm_our_nodeid())) {
6081  		error = send_purge(ls, nodeid, pid);
6082  	} else {
6083  		dlm_lock_recovery(ls);
6084  		if (pid == current->pid)
6085  			purge_proc_locks(ls, proc);
6086  		else
6087  			do_purge(ls, nodeid, pid);
6088  		dlm_unlock_recovery(ls);
6089  	}
6090  	return error;
6091  }
6092  
6093  /* debug functionality */
6094  int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6095  		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6096  {
6097  	struct dlm_lksb *lksb;
6098  	struct dlm_lkb *lkb;
6099  	struct dlm_rsb *r;
6100  	int error;
6101  
6102  	/* we currently can't set a valid user lock */
6103  	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6104  		return -EOPNOTSUPP;
6105  
6106  	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6107  	if (!lksb)
6108  		return -ENOMEM;
6109  
6110  	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6111  	if (error) {
6112  		kfree(lksb);
6113  		return error;
6114  	}
6115  
6116  	dlm_set_dflags_val(lkb, lkb_dflags);
6117  	lkb->lkb_nodeid = lkb_nodeid;
6118  	lkb->lkb_lksb = lksb;
6119  	/* user specific pointer, just don't have it NULL for kernel locks */
6120  	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6121  		lkb->lkb_astparam = (void *)0xDEADBEEF;
6122  
6123  	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6124  	if (error) {
6125  		kfree(lksb);
6126  		__put_lkb(ls, lkb);
6127  		return error;
6128  	}
6129  
6130  	lock_rsb(r);
6131  	attach_lkb(r, lkb);
6132  	add_lkb(r, lkb, lkb_status);
6133  	unlock_rsb(r);
6134  	put_rsb(r);
6135  
6136  	return 0;
6137  }
6138  
6139  int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6140  				 int mstype, int to_nodeid)
6141  {
6142  	struct dlm_lkb *lkb;
6143  	int error;
6144  
6145  	error = find_lkb(ls, lkb_id, &lkb);
6146  	if (error)
6147  		return error;
6148  
6149  	error = add_to_waiters(lkb, mstype, to_nodeid);
6150  	dlm_put_lkb(lkb);
6151  	return error;
6152  }
6153  
6154