xref: /linux/fs/dlm/lock.c (revision 3d3a9c8b89d4f8a3785e06ffd15405c670696f02)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10 
11 /* Central locking logic has four stages:
12 
13    dlm_lock()
14    dlm_unlock()
15 
16    request_lock(ls, lkb)
17    convert_lock(ls, lkb)
18    unlock_lock(ls, lkb)
19    cancel_lock(ls, lkb)
20 
21    _request_lock(r, lkb)
22    _convert_lock(r, lkb)
23    _unlock_lock(r, lkb)
24    _cancel_lock(r, lkb)
25 
26    do_request(r, lkb)
27    do_convert(r, lkb)
28    do_unlock(r, lkb)
29    do_cancel(r, lkb)
30 
31    Stage 1 (lock, unlock) is mainly about checking input args and
32    splitting into one of the four main operations:
33 
34        dlm_lock          = request_lock
35        dlm_lock+CONVERT  = convert_lock
36        dlm_unlock        = unlock_lock
37        dlm_unlock+CANCEL = cancel_lock
38 
39    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40    provided to the next stage.
41 
42    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43    When remote, it calls send_xxxx(), when local it calls do_xxxx().
44 
45    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
46    given rsb and lkb and queues callbacks.
47 
48    For remote operations, send_xxxx() results in the corresponding do_xxxx()
49    function being executed on the remote node.  The connecting send/receive
50    calls on local (L) and remote (R) nodes:
51 
52    L: send_xxxx()              ->  R: receive_xxxx()
53                                    R: do_xxxx()
54    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57 
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77 
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 				    const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93 
94 /*
95  * Lock compatibilty matrix - thanks Steve
96  * UN = Unlocked state. Not really a state, used as a flag
97  * PD = Padding. Used to make the matrix a nice power of two in size
98  * Other states are the same as the VMS DLM.
99  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
100  */
101 
102 static const int __dlm_compat_matrix[8][8] = {
103       /* UN NL CR CW PR PW EX PD */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
105         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
106         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
107         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
108         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
109         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
110         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
111         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
112 };
113 
114 /*
115  * This defines the direction of transfer of LVB data.
116  * Granted mode is the row; requested mode is the column.
117  * Usage: matrix[grmode+1][rqmode+1]
118  * 1 = LVB is returned to the caller
119  * 0 = LVB is written to the resource
120  * -1 = nothing happens to the LVB
121  */
122 
123 const int dlm_lvb_operations[8][8] = {
124         /* UN   NL  CR  CW  PR  PW  EX  PD*/
125         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
126         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
127         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
128         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
129         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
130         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
132         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
133 };
134 
135 #define modes_compat(gr, rq) \
136 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137 
dlm_modes_compat(int mode1,int mode2)138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142 
143 /*
144  * Compatibility matrix for conversions with QUECVT set.
145  * Granted mode is the row; requested mode is the column.
146  * Usage: matrix[grmode+1][rqmode+1]
147  */
148 
149 static const int __quecvt_compat_matrix[8][8] = {
150       /* UN NL CR CW PR PW EX PD */
151         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
152         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
153         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
154         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
155         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
156         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
157         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
158         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
159 };
160 
dlm_print_lkb(struct dlm_lkb * lkb)161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 	       "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 	       dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 	       (unsigned long long)lkb->lkb_recover_seq);
169 }
170 
dlm_print_rsb(struct dlm_rsb * r)171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 	printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 	       "rlc %d name %s\n",
175 	       r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 	       r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 	       r->res_name);
178 }
179 
dlm_dump_rsb(struct dlm_rsb * r)180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 	struct dlm_lkb *lkb;
183 
184 	dlm_print_rsb(r);
185 
186 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 	printk(KERN_ERR "rsb lookup list\n");
189 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 		dlm_print_lkb(lkb);
191 	printk(KERN_ERR "rsb grant queue:\n");
192 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 		dlm_print_lkb(lkb);
194 	printk(KERN_ERR "rsb convert queue:\n");
195 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 		dlm_print_lkb(lkb);
197 	printk(KERN_ERR "rsb wait queue:\n");
198 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 		dlm_print_lkb(lkb);
200 }
201 
202 /* Threads cannot use the lockspace while it's being recovered */
203 
dlm_lock_recovery(struct dlm_ls * ls)204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 	down_read(&ls->ls_in_recovery);
207 }
208 
dlm_unlock_recovery(struct dlm_ls * ls)209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 	up_read(&ls->ls_in_recovery);
212 }
213 
dlm_lock_recovery_try(struct dlm_ls * ls)214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 	return down_read_trylock(&ls->ls_in_recovery);
217 }
218 
can_be_queued(struct dlm_lkb * lkb)219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223 
force_blocking_asts(struct dlm_lkb * lkb)224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228 
is_demoted(struct dlm_lkb * lkb)229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 	return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233 
is_altmode(struct dlm_lkb * lkb)234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 	return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238 
is_granted(struct dlm_lkb * lkb)239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243 
is_remote(struct dlm_rsb * r)244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 	return !!r->res_nodeid;
248 }
249 
is_process_copy(struct dlm_lkb * lkb)250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 	return lkb->lkb_nodeid &&
253 	       !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255 
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 	return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260 
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 		return 1;
266 	return 0;
267 }
268 
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273 
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278 
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 	return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283 
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 	return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 	       test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289 
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 	if (is_master_copy(lkb))
293 		return;
294 
295 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296 
297 	if (rv == -DLM_ECANCEL &&
298 	    test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 		rv = -EDEADLK;
300 
301 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303 
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 	queue_cast(r, lkb,
307 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309 
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 	if (is_master_copy(lkb)) {
313 		send_bast(r, lkb, rqmode);
314 	} else {
315 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 	}
317 }
318 
319 /*
320  * Basic operations on rsb's and lkb's
321  */
322 
rsb_toss_jiffies(void)323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 	return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327 
328 /* This is only called to add a reference when the code already holds
329    a valid reference to the rsb, so there's no need for locking. */
330 
hold_rsb(struct dlm_rsb * r)331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 	/* inactive rsbs are not ref counted */
334 	WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 	kref_get(&r->res_ref);
336 }
337 
dlm_hold_rsb(struct dlm_rsb * r)338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 	hold_rsb(r);
341 }
342 
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 	if (refcount_dec_not_one(r))
349 		return false;
350 
351 	write_lock_bh(lock);
352 	if (!refcount_dec_and_test(r)) {
353 		write_unlock_bh(lock);
354 		return false;
355 	}
356 
357 	return true;
358 }
359 
360 /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 					     void (*release)(struct kref *kref),
363 					     rwlock_t *lock)
364 {
365 	if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 		release(kref);
367 		return 1;
368 	}
369 
370 	return 0;
371 }
372 
put_rsb(struct dlm_rsb * r)373 static void put_rsb(struct dlm_rsb *r)
374 {
375 	struct dlm_ls *ls = r->res_ls;
376 	int rv;
377 
378 	rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 					&ls->ls_rsbtbl_lock);
380 	if (rv)
381 		write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383 
dlm_put_rsb(struct dlm_rsb * r)384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 	put_rsb(r);
387 }
388 
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390  * new timers when recovery is triggered and don't run them
391  * again until a resume_scan_timer() tries it again.
392  */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 	if (!dlm_locking_stopped(ls))
396 		mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398 
399 /* This function tries to resume the timer callback if a rsb
400  * is on the scan list and no timer is pending. It might that
401  * the first entry is on currently executed as timer callback
402  * but we don't care if a timer queued up again and does
403  * nothing. Should be a rare case.
404  */
resume_scan_timer(struct dlm_ls * ls)405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 	struct dlm_rsb *r;
408 
409 	spin_lock_bh(&ls->ls_scan_lock);
410 	r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 				     res_scan_list);
412 	if (r && !timer_pending(&ls->ls_scan_timer))
413 		enable_scan_timer(ls, r->res_toss_time);
414 	spin_unlock_bh(&ls->ls_scan_lock);
415 }
416 
417 /* ls_rsbtbl_lock must be held */
418 
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 	struct dlm_rsb *first;
422 
423 	/* active rsbs should never be on the scan list */
424 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425 
426 	spin_lock_bh(&ls->ls_scan_lock);
427 	r->res_toss_time = 0;
428 
429 	/* if the rsb is not queued do nothing */
430 	if (list_empty(&r->res_scan_list))
431 		goto out;
432 
433 	/* get the first element before delete */
434 	first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 				 res_scan_list);
436 	list_del_init(&r->res_scan_list);
437 	/* check if the first element was the rsb we deleted */
438 	if (first == r) {
439 		/* try to get the new first element, if the list
440 		 * is empty now try to delete the timer, if we are
441 		 * too late we don't care.
442 		 *
443 		 * if the list isn't empty and a new first element got
444 		 * in place, set the new timer expire time.
445 		 */
446 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 						 res_scan_list);
448 		if (!first)
449 			timer_delete(&ls->ls_scan_timer);
450 		else
451 			enable_scan_timer(ls, first->res_toss_time);
452 	}
453 
454 out:
455 	spin_unlock_bh(&ls->ls_scan_lock);
456 }
457 
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 	int our_nodeid = dlm_our_nodeid();
461 	struct dlm_rsb *first;
462 
463 	/* A dir record for a remote master rsb should never be on the scan list. */
464 	WARN_ON(!dlm_no_directory(ls) &&
465 		(r->res_master_nodeid != our_nodeid) &&
466 		(dlm_dir_nodeid(r) == our_nodeid));
467 
468 	/* An active rsb should never be on the scan list. */
469 	WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470 
471 	/* An rsb should not already be on the scan list. */
472 	WARN_ON(!list_empty(&r->res_scan_list));
473 
474 	spin_lock_bh(&ls->ls_scan_lock);
475 	/* set the new rsb absolute expire time in the rsb */
476 	r->res_toss_time = rsb_toss_jiffies();
477 	if (list_empty(&ls->ls_scan_list)) {
478 		/* if the queue is empty add the element and it's
479 		 * our new expire time
480 		 */
481 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 		enable_scan_timer(ls, r->res_toss_time);
483 	} else {
484 		/* try to get the maybe new first element and then add
485 		 * to this rsb with the oldest expire time to the end
486 		 * of the queue. If the list was empty before this
487 		 * rsb expire time is our next expiration if it wasn't
488 		 * the now new first elemet is our new expiration time
489 		 */
490 		first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 						 res_scan_list);
492 		list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 		if (!first)
494 			enable_scan_timer(ls, r->res_toss_time);
495 		else
496 			enable_scan_timer(ls, first->res_toss_time);
497 	}
498 	spin_unlock_bh(&ls->ls_scan_lock);
499 }
500 
501 /* if we hit contention we do in 250 ms a retry to trylock.
502  * if there is any other mod_timer in between we don't care
503  * about that it expires earlier again this is only for the
504  * unlikely case nothing happened in this time.
505  */
506 #define DLM_TOSS_TIMER_RETRY	(jiffies + msecs_to_jiffies(250))
507 
508 /* Called by lockspace scan_timer to free unused rsb's. */
509 
dlm_rsb_scan(struct timer_list * timer)510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 	struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 	int our_nodeid = dlm_our_nodeid();
514 	struct dlm_rsb *r;
515 	int rv;
516 
517 	while (1) {
518 		/* interrupting point to leave iteration when
519 		 * recovery waits for timer_delete_sync(), recovery
520 		 * will take care to delete everything in scan list.
521 		 */
522 		if (dlm_locking_stopped(ls))
523 			break;
524 
525 		rv = spin_trylock(&ls->ls_scan_lock);
526 		if (!rv) {
527 			/* rearm again try timer */
528 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 			break;
530 		}
531 
532 		r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 					     res_scan_list);
534 		if (!r) {
535 			/* the next add_scan will enable the timer again */
536 			spin_unlock(&ls->ls_scan_lock);
537 			break;
538 		}
539 
540 		/*
541 		 * If the first rsb is not yet expired, then stop because the
542 		 * list is sorted with nearest expiration first.
543 		 */
544 		if (time_before(jiffies, r->res_toss_time)) {
545 			/* rearm with the next rsb to expire in the future */
546 			enable_scan_timer(ls, r->res_toss_time);
547 			spin_unlock(&ls->ls_scan_lock);
548 			break;
549 		}
550 
551 		/* in find_rsb_dir/nodir there is a reverse order of this
552 		 * lock, however this is only a trylock if we hit some
553 		 * possible contention we try it again.
554 		 */
555 		rv = write_trylock(&ls->ls_rsbtbl_lock);
556 		if (!rv) {
557 			spin_unlock(&ls->ls_scan_lock);
558 			/* rearm again try timer */
559 			enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 			break;
561 		}
562 
563 		list_del(&r->res_slow_list);
564 		rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 				       dlm_rhash_rsb_params);
566 		rsb_clear_flag(r, RSB_HASHED);
567 
568 		/* ls_rsbtbl_lock is not needed when calling send_remove() */
569 		write_unlock(&ls->ls_rsbtbl_lock);
570 
571 		list_del_init(&r->res_scan_list);
572 		spin_unlock(&ls->ls_scan_lock);
573 
574 		/* An rsb that is a dir record for a remote master rsb
575 		 * cannot be removed, and should not have a timer enabled.
576 		 */
577 		WARN_ON(!dlm_no_directory(ls) &&
578 			(r->res_master_nodeid != our_nodeid) &&
579 			(dlm_dir_nodeid(r) == our_nodeid));
580 
581 		/* We're the master of this rsb but we're not
582 		 * the directory record, so we need to tell the
583 		 * dir node to remove the dir record
584 		 */
585 		if (!dlm_no_directory(ls) &&
586 		    (r->res_master_nodeid == our_nodeid) &&
587 		    (dlm_dir_nodeid(r) != our_nodeid))
588 			send_remove(r);
589 
590 		free_inactive_rsb(r);
591 	}
592 }
593 
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595    unlock any spinlocks, go back and call pre_rsb_struct again.
596    Otherwise, take an rsb off the list and return it. */
597 
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 			  struct dlm_rsb **r_ret)
600 {
601 	struct dlm_rsb *r;
602 
603 	r = dlm_allocate_rsb();
604 	if (!r)
605 		return -ENOMEM;
606 
607 	r->res_ls = ls;
608 	r->res_length = len;
609 	memcpy(r->res_name, name, len);
610 	spin_lock_init(&r->res_lock);
611 
612 	INIT_LIST_HEAD(&r->res_lookup);
613 	INIT_LIST_HEAD(&r->res_grantqueue);
614 	INIT_LIST_HEAD(&r->res_convertqueue);
615 	INIT_LIST_HEAD(&r->res_waitqueue);
616 	INIT_LIST_HEAD(&r->res_root_list);
617 	INIT_LIST_HEAD(&r->res_scan_list);
618 	INIT_LIST_HEAD(&r->res_recover_list);
619 	INIT_LIST_HEAD(&r->res_masters_list);
620 
621 	*r_ret = r;
622 	return 0;
623 }
624 
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 			struct dlm_rsb **r_ret)
627 {
628 	char key[DLM_RESNAME_MAXLEN] = {};
629 
630 	memcpy(key, name, len);
631 	*r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 	if (*r_ret)
633 		return 0;
634 
635 	return -EBADR;
636 }
637 
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 	int rv;
641 
642 	rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 				    dlm_rhash_rsb_params);
644 	if (!rv)
645 		rsb_set_flag(rsb, RSB_HASHED);
646 
647 	return rv;
648 }
649 
650 /*
651  * Find rsb in rsbtbl and potentially create/add one
652  *
653  * Delaying the release of rsb's has a similar benefit to applications keeping
654  * NL locks on an rsb, but without the guarantee that the cached master value
655  * will still be valid when the rsb is reused.  Apps aren't always smart enough
656  * to keep NL locks on an rsb that they may lock again shortly; this can lead
657  * to excessive master lookups and removals if we don't delay the release.
658  *
659  * Searching for an rsb means looking through both the normal list and toss
660  * list.  When found on the toss list the rsb is moved to the normal list with
661  * ref count of 1; when found on normal list the ref count is incremented.
662  *
663  * rsb's on the keep list are being used locally and refcounted.
664  * rsb's on the toss list are not being used locally, and are not refcounted.
665  *
666  * The toss list rsb's were either
667  * - previously used locally but not any more (were on keep list, then
668  *   moved to toss list when last refcount dropped)
669  * - created and put on toss list as a directory record for a lookup
670  *   (we are the dir node for the res, but are not using the res right now,
671  *   but some other node is)
672  *
673  * The purpose of find_rsb() is to return a refcounted rsb for local use.
674  * So, if the given rsb is on the toss list, it is moved to the keep list
675  * before being returned.
676  *
677  * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678  * more refcounts exist, so the rsb is moved from the keep list to the
679  * toss list.
680  *
681  * rsb's on both keep and toss lists are used for doing a name to master
682  * lookups.  rsb's that are in use locally (and being refcounted) are on
683  * the keep list, rsb's that are not in use locally (not refcounted) and
684  * only exist for name/master lookups are on the toss list.
685  *
686  * rsb's on the toss list who's dir_nodeid is not local can have stale
687  * name/master mappings.  So, remote requests on such rsb's can potentially
688  * return with an error, which means the mapping is stale and needs to
689  * be updated with a new lookup.  (The idea behind MASTER UNCERTAIN and
690  * first_lkid is to keep only a single outstanding request on an rsb
691  * while that rsb has a potentially stale master.)
692  */
693 
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 			uint32_t hash, int dir_nodeid, int from_nodeid,
696 			unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 	struct dlm_rsb *r = NULL;
699 	int our_nodeid = dlm_our_nodeid();
700 	int from_local = 0;
701 	int from_other = 0;
702 	int from_dir = 0;
703 	int create = 0;
704 	int error;
705 
706 	if (flags & R_RECEIVE_REQUEST) {
707 		if (from_nodeid == dir_nodeid)
708 			from_dir = 1;
709 		else
710 			from_other = 1;
711 	} else if (flags & R_REQUEST) {
712 		from_local = 1;
713 	}
714 
715 	/*
716 	 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 	 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 	 * we're the new master.  Our local recovery may not have set
719 	 * res_master_nodeid to our_nodeid yet, so allow either.  Don't
720 	 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 	 * by resending.
722 	 *
723 	 * If someone sends us a request, we are the dir node, and we do
724 	 * not find the rsb anywhere, then recreate it.  This happens if
725 	 * someone sends us a request after we have removed/freed an rsb.
726 	 * (They sent a request instead of lookup because they are using
727 	 * an rsb taken from their scan list.)
728 	 */
729 
730 	if (from_local || from_dir ||
731 	    (from_other && (dir_nodeid == our_nodeid))) {
732 		create = 1;
733 	}
734 
735  retry:
736 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 	if (error)
738 		goto do_new;
739 
740 	/* check if the rsb is active under read lock - likely path */
741 	read_lock_bh(&ls->ls_rsbtbl_lock);
742 	if (!rsb_flag(r, RSB_HASHED)) {
743 		read_unlock_bh(&ls->ls_rsbtbl_lock);
744 		goto do_new;
745 	}
746 
747 	/*
748 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
749 	 */
750 
751 	if (rsb_flag(r, RSB_INACTIVE)) {
752 		read_unlock_bh(&ls->ls_rsbtbl_lock);
753 		goto do_inactive;
754 	}
755 
756 	kref_get(&r->res_ref);
757 	read_unlock_bh(&ls->ls_rsbtbl_lock);
758 	goto out;
759 
760 
761  do_inactive:
762 	write_lock_bh(&ls->ls_rsbtbl_lock);
763 
764 	/*
765 	 * The expectation here is that the rsb will have HASHED and
766 	 * INACTIVE flags set, and that the rsb can be moved from
767 	 * inactive back to active again.  However, between releasing
768 	 * the read lock and acquiring the write lock, this rsb could
769 	 * have been removed from rsbtbl, and had HASHED cleared, to
770 	 * be freed.  To deal with this case, we would normally need
771 	 * to repeat dlm_search_rsb_tree while holding the write lock,
772 	 * but rcu allows us to simply check the HASHED flag, because
773 	 * the rcu read lock means the rsb will not be freed yet.
774 	 * If the HASHED flag is not set, then the rsb is being freed,
775 	 * so we add a new rsb struct.  If the HASHED flag is set,
776 	 * and INACTIVE is not set, it means another thread has
777 	 * made the rsb active, as we're expecting to do here, and
778 	 * we just repeat the lookup (this will be very unlikely.)
779 	 */
780 	if (rsb_flag(r, RSB_HASHED)) {
781 		if (!rsb_flag(r, RSB_INACTIVE)) {
782 			write_unlock_bh(&ls->ls_rsbtbl_lock);
783 			goto retry;
784 		}
785 	} else {
786 		write_unlock_bh(&ls->ls_rsbtbl_lock);
787 		goto do_new;
788 	}
789 
790 	/*
791 	 * rsb found inactive (master_nodeid may be out of date unless
792 	 * we are the dir_nodeid or were the master)  No other thread
793 	 * is using this rsb because it's inactive, so we can
794 	 * look at or update res_master_nodeid without lock_rsb.
795 	 */
796 
797 	if ((r->res_master_nodeid != our_nodeid) && from_other) {
798 		/* our rsb was not master, and another node (not the dir node)
799 		   has sent us a request */
800 		log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
801 			  from_nodeid, r->res_master_nodeid, dir_nodeid,
802 			  r->res_name);
803 		write_unlock_bh(&ls->ls_rsbtbl_lock);
804 		error = -ENOTBLK;
805 		goto out;
806 	}
807 
808 	if ((r->res_master_nodeid != our_nodeid) && from_dir) {
809 		/* don't think this should ever happen */
810 		log_error(ls, "find_rsb inactive from_dir %d master %d",
811 			  from_nodeid, r->res_master_nodeid);
812 		dlm_print_rsb(r);
813 		/* fix it and go on */
814 		r->res_master_nodeid = our_nodeid;
815 		r->res_nodeid = 0;
816 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
817 		r->res_first_lkid = 0;
818 	}
819 
820 	if (from_local && (r->res_master_nodeid != our_nodeid)) {
821 		/* Because we have held no locks on this rsb,
822 		   res_master_nodeid could have become stale. */
823 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
824 		r->res_first_lkid = 0;
825 	}
826 
827 	/* we always deactivate scan timer for the rsb, when
828 	 * we move it out of the inactive state as rsb state
829 	 * can be changed and scan timers are only for inactive
830 	 * rsbs.
831 	 */
832 	del_scan(ls, r);
833 	list_move(&r->res_slow_list, &ls->ls_slow_active);
834 	rsb_clear_flag(r, RSB_INACTIVE);
835 	kref_init(&r->res_ref); /* ref is now used in active state */
836 	write_unlock_bh(&ls->ls_rsbtbl_lock);
837 
838 	goto out;
839 
840 
841  do_new:
842 	/*
843 	 * rsb not found
844 	 */
845 
846 	if (error == -EBADR && !create)
847 		goto out;
848 
849 	error = get_rsb_struct(ls, name, len, &r);
850 	if (WARN_ON_ONCE(error))
851 		goto out;
852 
853 	r->res_hash = hash;
854 	r->res_dir_nodeid = dir_nodeid;
855 	kref_init(&r->res_ref);
856 
857 	if (from_dir) {
858 		/* want to see how often this happens */
859 		log_debug(ls, "find_rsb new from_dir %d recreate %s",
860 			  from_nodeid, r->res_name);
861 		r->res_master_nodeid = our_nodeid;
862 		r->res_nodeid = 0;
863 		goto out_add;
864 	}
865 
866 	if (from_other && (dir_nodeid != our_nodeid)) {
867 		/* should never happen */
868 		log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
869 			  from_nodeid, dir_nodeid, our_nodeid, r->res_name);
870 		dlm_free_rsb(r);
871 		r = NULL;
872 		error = -ENOTBLK;
873 		goto out;
874 	}
875 
876 	if (from_other) {
877 		log_debug(ls, "find_rsb new from_other %d dir %d %s",
878 			  from_nodeid, dir_nodeid, r->res_name);
879 	}
880 
881 	if (dir_nodeid == our_nodeid) {
882 		/* When we are the dir nodeid, we can set the master
883 		   node immediately */
884 		r->res_master_nodeid = our_nodeid;
885 		r->res_nodeid = 0;
886 	} else {
887 		/* set_master will send_lookup to dir_nodeid */
888 		r->res_master_nodeid = 0;
889 		r->res_nodeid = -1;
890 	}
891 
892  out_add:
893 
894 	write_lock_bh(&ls->ls_rsbtbl_lock);
895 	error = rsb_insert(r, &ls->ls_rsbtbl);
896 	if (error == -EEXIST) {
897 		/* somebody else was faster and it seems the
898 		 * rsb exists now, we do a whole relookup
899 		 */
900 		write_unlock_bh(&ls->ls_rsbtbl_lock);
901 		dlm_free_rsb(r);
902 		goto retry;
903 	} else if (!error) {
904 		list_add(&r->res_slow_list, &ls->ls_slow_active);
905 	}
906 	write_unlock_bh(&ls->ls_rsbtbl_lock);
907  out:
908 	*r_ret = r;
909 	return error;
910 }
911 
912 /* During recovery, other nodes can send us new MSTCPY locks (from
913    dlm_recover_locks) before we've made ourself master (in
914    dlm_recover_masters). */
915 
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)916 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
917 			  uint32_t hash, int dir_nodeid, int from_nodeid,
918 			  unsigned int flags, struct dlm_rsb **r_ret)
919 {
920 	struct dlm_rsb *r = NULL;
921 	int our_nodeid = dlm_our_nodeid();
922 	int recover = (flags & R_RECEIVE_RECOVER);
923 	int error;
924 
925  retry:
926 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
927 	if (error)
928 		goto do_new;
929 
930 	/* check if the rsb is in active state under read lock - likely path */
931 	read_lock_bh(&ls->ls_rsbtbl_lock);
932 	if (!rsb_flag(r, RSB_HASHED)) {
933 		read_unlock_bh(&ls->ls_rsbtbl_lock);
934 		goto do_new;
935 	}
936 
937 	if (rsb_flag(r, RSB_INACTIVE)) {
938 		read_unlock_bh(&ls->ls_rsbtbl_lock);
939 		goto do_inactive;
940 	}
941 
942 	/*
943 	 * rsb is active, so we can't check master_nodeid without lock_rsb.
944 	 */
945 
946 	kref_get(&r->res_ref);
947 	read_unlock_bh(&ls->ls_rsbtbl_lock);
948 
949 	goto out;
950 
951 
952  do_inactive:
953 	write_lock_bh(&ls->ls_rsbtbl_lock);
954 
955 	/* See comment in find_rsb_dir. */
956 	if (rsb_flag(r, RSB_HASHED)) {
957 		if (!rsb_flag(r, RSB_INACTIVE)) {
958 			write_unlock_bh(&ls->ls_rsbtbl_lock);
959 			goto retry;
960 		}
961 	} else {
962 		write_unlock_bh(&ls->ls_rsbtbl_lock);
963 		goto do_new;
964 	}
965 
966 
967 	/*
968 	 * rsb found inactive. No other thread is using this rsb because
969 	 * it's inactive, so we can look at or update res_master_nodeid
970 	 * without lock_rsb.
971 	 */
972 
973 	if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
974 		/* our rsb is not master, and another node has sent us a
975 		   request; this should never happen */
976 		log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
977 			  from_nodeid, r->res_master_nodeid, dir_nodeid);
978 		dlm_print_rsb(r);
979 		write_unlock_bh(&ls->ls_rsbtbl_lock);
980 		error = -ENOTBLK;
981 		goto out;
982 	}
983 
984 	if (!recover && (r->res_master_nodeid != our_nodeid) &&
985 	    (dir_nodeid == our_nodeid)) {
986 		/* our rsb is not master, and we are dir; may as well fix it;
987 		   this should never happen */
988 		log_error(ls, "find_rsb inactive our %d master %d dir %d",
989 			  our_nodeid, r->res_master_nodeid, dir_nodeid);
990 		dlm_print_rsb(r);
991 		r->res_master_nodeid = our_nodeid;
992 		r->res_nodeid = 0;
993 	}
994 
995 	del_scan(ls, r);
996 	list_move(&r->res_slow_list, &ls->ls_slow_active);
997 	rsb_clear_flag(r, RSB_INACTIVE);
998 	kref_init(&r->res_ref);
999 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1000 
1001 	goto out;
1002 
1003 
1004  do_new:
1005 	/*
1006 	 * rsb not found
1007 	 */
1008 
1009 	error = get_rsb_struct(ls, name, len, &r);
1010 	if (WARN_ON_ONCE(error))
1011 		goto out;
1012 
1013 	r->res_hash = hash;
1014 	r->res_dir_nodeid = dir_nodeid;
1015 	r->res_master_nodeid = dir_nodeid;
1016 	r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1017 	kref_init(&r->res_ref);
1018 
1019 	write_lock_bh(&ls->ls_rsbtbl_lock);
1020 	error = rsb_insert(r, &ls->ls_rsbtbl);
1021 	if (error == -EEXIST) {
1022 		/* somebody else was faster and it seems the
1023 		 * rsb exists now, we do a whole relookup
1024 		 */
1025 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1026 		dlm_free_rsb(r);
1027 		goto retry;
1028 	} else if (!error) {
1029 		list_add(&r->res_slow_list, &ls->ls_slow_active);
1030 	}
1031 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1032 
1033  out:
1034 	*r_ret = r;
1035 	return error;
1036 }
1037 
1038 /*
1039  * rsb rcu usage
1040  *
1041  * While rcu read lock is held, the rsb cannot be freed,
1042  * which allows a lookup optimization.
1043  *
1044  * Two threads are accessing the same rsb concurrently,
1045  * the first (A) is trying to use the rsb, the second (B)
1046  * is trying to free the rsb.
1047  *
1048  * thread A                 thread B
1049  * (trying to use rsb)      (trying to free rsb)
1050  *
1051  * A1. rcu read lock
1052  * A2. rsbtbl read lock
1053  * A3. look up rsb in rsbtbl
1054  * A4. rsbtbl read unlock
1055  *                          B1. rsbtbl write lock
1056  *                          B2. look up rsb in rsbtbl
1057  *                          B3. remove rsb from rsbtbl
1058  *                          B4. clear rsb HASHED flag
1059  *                          B5. rsbtbl write unlock
1060  *                          B6. begin freeing rsb using rcu...
1061  *
1062  * (rsb is inactive, so try to make it active again)
1063  * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1064  * A6. the rsb HASHED flag is not set, which it means the rsb
1065  *     is being removed from rsbtbl and freed, so don't use it.
1066  * A7. rcu read unlock
1067  *
1068  *                          B7. ...finish freeing rsb using rcu
1069  * A8. create a new rsb
1070  *
1071  * Without the rcu optimization, steps A5-8 would need to do
1072  * an extra rsbtbl lookup:
1073  * A5. rsbtbl write lock
1074  * A6. look up rsb in rsbtbl, not found
1075  * A7. rsbtbl write unlock
1076  * A8. create a new rsb
1077  */
1078 
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1079 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1080 		    int from_nodeid, unsigned int flags,
1081 		    struct dlm_rsb **r_ret)
1082 {
1083 	int dir_nodeid;
1084 	uint32_t hash;
1085 	int rv;
1086 
1087 	if (len > DLM_RESNAME_MAXLEN)
1088 		return -EINVAL;
1089 
1090 	hash = jhash(name, len, 0);
1091 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1092 
1093 	rcu_read_lock();
1094 	if (dlm_no_directory(ls))
1095 		rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1096 				      from_nodeid, flags, r_ret);
1097 	else
1098 		rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1099 				    from_nodeid, flags, r_ret);
1100 	rcu_read_unlock();
1101 	return rv;
1102 }
1103 
1104 /* we have received a request and found that res_master_nodeid != our_nodeid,
1105    so we need to return an error or make ourself the master */
1106 
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1107 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1108 				  int from_nodeid)
1109 {
1110 	if (dlm_no_directory(ls)) {
1111 		log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1112 			  from_nodeid, r->res_master_nodeid,
1113 			  r->res_dir_nodeid);
1114 		dlm_print_rsb(r);
1115 		return -ENOTBLK;
1116 	}
1117 
1118 	if (from_nodeid != r->res_dir_nodeid) {
1119 		/* our rsb is not master, and another node (not the dir node)
1120 	   	   has sent us a request.  this is much more common when our
1121 	   	   master_nodeid is zero, so limit debug to non-zero.  */
1122 
1123 		if (r->res_master_nodeid) {
1124 			log_debug(ls, "validate master from_other %d master %d "
1125 				  "dir %d first %x %s", from_nodeid,
1126 				  r->res_master_nodeid, r->res_dir_nodeid,
1127 				  r->res_first_lkid, r->res_name);
1128 		}
1129 		return -ENOTBLK;
1130 	} else {
1131 		/* our rsb is not master, but the dir nodeid has sent us a
1132 	   	   request; this could happen with master 0 / res_nodeid -1 */
1133 
1134 		if (r->res_master_nodeid) {
1135 			log_error(ls, "validate master from_dir %d master %d "
1136 				  "first %x %s",
1137 				  from_nodeid, r->res_master_nodeid,
1138 				  r->res_first_lkid, r->res_name);
1139 		}
1140 
1141 		r->res_master_nodeid = dlm_our_nodeid();
1142 		r->res_nodeid = 0;
1143 		return 0;
1144 	}
1145 }
1146 
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1147 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1148 				int from_nodeid, bool is_inactive, unsigned int flags,
1149 				int *r_nodeid, int *result)
1150 {
1151 	int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1152 	int from_master = (flags & DLM_LU_RECOVER_DIR);
1153 
1154 	if (r->res_dir_nodeid != our_nodeid) {
1155 		/* should not happen, but may as well fix it and carry on */
1156 		log_error(ls, "%s res_dir %d our %d %s", __func__,
1157 			  r->res_dir_nodeid, our_nodeid, r->res_name);
1158 		r->res_dir_nodeid = our_nodeid;
1159 	}
1160 
1161 	if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1162 		/* Recovery uses this function to set a new master when
1163 		 * the previous master failed.  Setting NEW_MASTER will
1164 		 * force dlm_recover_masters to call recover_master on this
1165 		 * rsb even though the res_nodeid is no longer removed.
1166 		 */
1167 
1168 		r->res_master_nodeid = from_nodeid;
1169 		r->res_nodeid = from_nodeid;
1170 		rsb_set_flag(r, RSB_NEW_MASTER);
1171 
1172 		if (is_inactive) {
1173 			/* I don't think we should ever find it inactive. */
1174 			log_error(ls, "%s fix_master inactive", __func__);
1175 			dlm_dump_rsb(r);
1176 		}
1177 	}
1178 
1179 	if (from_master && (r->res_master_nodeid != from_nodeid)) {
1180 		/* this will happen if from_nodeid became master during
1181 		 * a previous recovery cycle, and we aborted the previous
1182 		 * cycle before recovering this master value
1183 		 */
1184 
1185 		log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1186 			  __func__, from_nodeid, r->res_master_nodeid,
1187 			  r->res_nodeid, r->res_first_lkid, r->res_name);
1188 
1189 		if (r->res_master_nodeid == our_nodeid) {
1190 			log_error(ls, "from_master %d our_master", from_nodeid);
1191 			dlm_dump_rsb(r);
1192 			goto ret_assign;
1193 		}
1194 
1195 		r->res_master_nodeid = from_nodeid;
1196 		r->res_nodeid = from_nodeid;
1197 		rsb_set_flag(r, RSB_NEW_MASTER);
1198 	}
1199 
1200 	if (!r->res_master_nodeid) {
1201 		/* this will happen if recovery happens while we're looking
1202 		 * up the master for this rsb
1203 		 */
1204 
1205 		log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1206 			  from_nodeid, r->res_first_lkid, r->res_name);
1207 		r->res_master_nodeid = from_nodeid;
1208 		r->res_nodeid = from_nodeid;
1209 	}
1210 
1211 	if (!from_master && !fix_master &&
1212 	    (r->res_master_nodeid == from_nodeid)) {
1213 		/* this can happen when the master sends remove, the dir node
1214 		 * finds the rsb on the active list and ignores the remove,
1215 		 * and the former master sends a lookup
1216 		 */
1217 
1218 		log_limit(ls, "%s from master %d flags %x first %x %s",
1219 			  __func__, from_nodeid, flags, r->res_first_lkid,
1220 			  r->res_name);
1221 	}
1222 
1223  ret_assign:
1224 	*r_nodeid = r->res_master_nodeid;
1225 	if (result)
1226 		*result = DLM_LU_MATCH;
1227 }
1228 
1229 /*
1230  * We're the dir node for this res and another node wants to know the
1231  * master nodeid.  During normal operation (non recovery) this is only
1232  * called from receive_lookup(); master lookups when the local node is
1233  * the dir node are done by find_rsb().
1234  *
1235  * normal operation, we are the dir node for a resource
1236  * . _request_lock
1237  * . set_master
1238  * . send_lookup
1239  * . receive_lookup
1240  * . dlm_master_lookup flags 0
1241  *
1242  * recover directory, we are rebuilding dir for all resources
1243  * . dlm_recover_directory
1244  * . dlm_rcom_names
1245  *   remote node sends back the rsb names it is master of and we are dir of
1246  * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1247  *   we either create new rsb setting remote node as master, or find existing
1248  *   rsb and set master to be the remote node.
1249  *
1250  * recover masters, we are finding the new master for resources
1251  * . dlm_recover_masters
1252  * . recover_master
1253  * . dlm_send_rcom_lookup
1254  * . receive_rcom_lookup
1255  * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1256  */
1257 
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1258 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1259 			      int len, unsigned int flags, int *r_nodeid, int *result)
1260 {
1261 	struct dlm_rsb *r = NULL;
1262 	uint32_t hash;
1263 	int our_nodeid = dlm_our_nodeid();
1264 	int dir_nodeid, error;
1265 
1266 	if (len > DLM_RESNAME_MAXLEN)
1267 		return -EINVAL;
1268 
1269 	if (from_nodeid == our_nodeid) {
1270 		log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1271 			  our_nodeid, flags);
1272 		return -EINVAL;
1273 	}
1274 
1275 	hash = jhash(name, len, 0);
1276 	dir_nodeid = dlm_hash2nodeid(ls, hash);
1277 	if (dir_nodeid != our_nodeid) {
1278 		log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1279 			  from_nodeid, dir_nodeid, our_nodeid, hash,
1280 			  ls->ls_num_nodes);
1281 		*r_nodeid = -1;
1282 		return -EINVAL;
1283 	}
1284 
1285  retry:
1286 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1287 	if (error)
1288 		goto not_found;
1289 
1290 	/* check if the rsb is active under read lock - likely path */
1291 	read_lock_bh(&ls->ls_rsbtbl_lock);
1292 	if (!rsb_flag(r, RSB_HASHED)) {
1293 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1294 		goto not_found;
1295 	}
1296 
1297 	if (rsb_flag(r, RSB_INACTIVE)) {
1298 		read_unlock_bh(&ls->ls_rsbtbl_lock);
1299 		goto do_inactive;
1300 	}
1301 
1302 	/* because the rsb is active, we need to lock_rsb before
1303 	 * checking/changing re_master_nodeid
1304 	 */
1305 
1306 	hold_rsb(r);
1307 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1308 	lock_rsb(r);
1309 
1310 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1311 			    flags, r_nodeid, result);
1312 
1313 	/* the rsb was active */
1314 	unlock_rsb(r);
1315 	put_rsb(r);
1316 
1317 	return 0;
1318 
1319  do_inactive:
1320 	/* unlikely path - check if still part of ls_rsbtbl */
1321 	write_lock_bh(&ls->ls_rsbtbl_lock);
1322 
1323 	/* see comment in find_rsb_dir */
1324 	if (rsb_flag(r, RSB_HASHED)) {
1325 		if (!rsb_flag(r, RSB_INACTIVE)) {
1326 			write_unlock_bh(&ls->ls_rsbtbl_lock);
1327 			/* something as changed, very unlikely but
1328 			 * try again
1329 			 */
1330 			goto retry;
1331 		}
1332 	} else {
1333 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1334 		goto not_found;
1335 	}
1336 
1337 	/* because the rsb is inactive, it's not refcounted and lock_rsb
1338 	   is not used, but is protected by the rsbtbl lock */
1339 
1340 	__dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1341 			    r_nodeid, result);
1342 
1343 	/* A dir record rsb should never be on scan list.
1344 	 * Except when we are the dir and master node.
1345 	 * This function should only be called by the dir
1346 	 * node.
1347 	 */
1348 	WARN_ON(!list_empty(&r->res_scan_list) &&
1349 		r->res_master_nodeid != our_nodeid);
1350 
1351 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1352 
1353 	return 0;
1354 
1355  not_found:
1356 	error = get_rsb_struct(ls, name, len, &r);
1357 	if (WARN_ON_ONCE(error))
1358 		goto out;
1359 
1360 	r->res_hash = hash;
1361 	r->res_dir_nodeid = our_nodeid;
1362 	r->res_master_nodeid = from_nodeid;
1363 	r->res_nodeid = from_nodeid;
1364 	rsb_set_flag(r, RSB_INACTIVE);
1365 
1366 	write_lock_bh(&ls->ls_rsbtbl_lock);
1367 	error = rsb_insert(r, &ls->ls_rsbtbl);
1368 	if (error == -EEXIST) {
1369 		/* somebody else was faster and it seems the
1370 		 * rsb exists now, we do a whole relookup
1371 		 */
1372 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1373 		dlm_free_rsb(r);
1374 		goto retry;
1375 	} else if (error) {
1376 		write_unlock_bh(&ls->ls_rsbtbl_lock);
1377 		/* should never happen */
1378 		dlm_free_rsb(r);
1379 		goto retry;
1380 	}
1381 
1382 	list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1383 	write_unlock_bh(&ls->ls_rsbtbl_lock);
1384 
1385 	if (result)
1386 		*result = DLM_LU_ADD;
1387 	*r_nodeid = from_nodeid;
1388  out:
1389 	return error;
1390 }
1391 
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1392 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1393 		      int len, unsigned int flags, int *r_nodeid, int *result)
1394 {
1395 	int rv;
1396 	rcu_read_lock();
1397 	rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1398 	rcu_read_unlock();
1399 	return rv;
1400 }
1401 
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1402 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1403 {
1404 	struct dlm_rsb *r;
1405 
1406 	read_lock_bh(&ls->ls_rsbtbl_lock);
1407 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1408 		if (r->res_hash == hash)
1409 			dlm_dump_rsb(r);
1410 	}
1411 	read_unlock_bh(&ls->ls_rsbtbl_lock);
1412 }
1413 
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1414 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1415 {
1416 	struct dlm_rsb *r = NULL;
1417 	int error;
1418 
1419 	rcu_read_lock();
1420 	error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1421 	if (!error)
1422 		goto out;
1423 
1424 	dlm_dump_rsb(r);
1425  out:
1426 	rcu_read_unlock();
1427 }
1428 
deactivate_rsb(struct kref * kref)1429 static void deactivate_rsb(struct kref *kref)
1430 {
1431 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1432 	struct dlm_ls *ls = r->res_ls;
1433 	int our_nodeid = dlm_our_nodeid();
1434 
1435 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1436 	rsb_set_flag(r, RSB_INACTIVE);
1437 	list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1438 
1439 	/*
1440 	 * When the rsb becomes unused, there are two possibilities:
1441 	 * 1. Leave the inactive rsb in place (don't remove it).
1442 	 * 2. Add it to the scan list to be removed.
1443 	 *
1444 	 * 1 is done when the rsb is acting as the dir record
1445 	 * for a remotely mastered rsb.  The rsb must be left
1446 	 * in place as an inactive rsb to act as the dir record.
1447 	 *
1448 	 * 2 is done when a) the rsb is not the master and not the
1449 	 * dir record, b) when the rsb is both the master and the
1450 	 * dir record, c) when the rsb is master but not dir record.
1451 	 *
1452 	 * (If no directory is used, the rsb can always be removed.)
1453 	 */
1454 	if (dlm_no_directory(ls) ||
1455 	    (r->res_master_nodeid == our_nodeid ||
1456 	     dlm_dir_nodeid(r) != our_nodeid))
1457 		add_scan(ls, r);
1458 
1459 	if (r->res_lvbptr) {
1460 		dlm_free_lvb(r->res_lvbptr);
1461 		r->res_lvbptr = NULL;
1462 	}
1463 }
1464 
free_inactive_rsb(struct dlm_rsb * r)1465 void free_inactive_rsb(struct dlm_rsb *r)
1466 {
1467 	WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1468 
1469 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1470 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1471 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1472 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1473 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1474 	DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1475 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1476 	DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1477 
1478 	dlm_free_rsb(r);
1479 }
1480 
1481 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1482    The rsb must exist as long as any lkb's for it do. */
1483 
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1484 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1485 {
1486 	hold_rsb(r);
1487 	lkb->lkb_resource = r;
1488 }
1489 
detach_lkb(struct dlm_lkb * lkb)1490 static void detach_lkb(struct dlm_lkb *lkb)
1491 {
1492 	if (lkb->lkb_resource) {
1493 		put_rsb(lkb->lkb_resource);
1494 		lkb->lkb_resource = NULL;
1495 	}
1496 }
1497 
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1498 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1499 		       unsigned long start, unsigned long end)
1500 {
1501 	struct xa_limit limit;
1502 	struct dlm_lkb *lkb;
1503 	int rv;
1504 
1505 	limit.max = end;
1506 	limit.min = start;
1507 
1508 	lkb = dlm_allocate_lkb();
1509 	if (!lkb)
1510 		return -ENOMEM;
1511 
1512 	lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1513 	lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1514 	lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1515 	lkb->lkb_nodeid = -1;
1516 	lkb->lkb_grmode = DLM_LOCK_IV;
1517 	kref_init(&lkb->lkb_ref);
1518 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1519 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1520 
1521 	write_lock_bh(&ls->ls_lkbxa_lock);
1522 	rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1523 	write_unlock_bh(&ls->ls_lkbxa_lock);
1524 
1525 	if (rv < 0) {
1526 		log_error(ls, "create_lkb xa error %d", rv);
1527 		dlm_free_lkb(lkb);
1528 		return rv;
1529 	}
1530 
1531 	*lkb_ret = lkb;
1532 	return 0;
1533 }
1534 
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1535 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1536 {
1537 	return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1538 }
1539 
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1540 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1541 {
1542 	struct dlm_lkb *lkb;
1543 
1544 	rcu_read_lock();
1545 	lkb = xa_load(&ls->ls_lkbxa, lkid);
1546 	if (lkb) {
1547 		/* check if lkb is still part of lkbxa under lkbxa_lock as
1548 		 * the lkb_ref is tight to the lkbxa data structure, see
1549 		 * __put_lkb().
1550 		 */
1551 		read_lock_bh(&ls->ls_lkbxa_lock);
1552 		if (kref_read(&lkb->lkb_ref))
1553 			kref_get(&lkb->lkb_ref);
1554 		else
1555 			lkb = NULL;
1556 		read_unlock_bh(&ls->ls_lkbxa_lock);
1557 	}
1558 	rcu_read_unlock();
1559 
1560 	*lkb_ret = lkb;
1561 	return lkb ? 0 : -ENOENT;
1562 }
1563 
kill_lkb(struct kref * kref)1564 static void kill_lkb(struct kref *kref)
1565 {
1566 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1567 
1568 	/* All work is done after the return from kref_put() so we
1569 	   can release the write_lock before the detach_lkb */
1570 
1571 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1572 }
1573 
1574 /* __put_lkb() is used when an lkb may not have an rsb attached to
1575    it so we need to provide the lockspace explicitly */
1576 
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1577 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1578 {
1579 	uint32_t lkid = lkb->lkb_id;
1580 	int rv;
1581 
1582 	rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1583 					&ls->ls_lkbxa_lock);
1584 	if (rv) {
1585 		xa_erase(&ls->ls_lkbxa, lkid);
1586 		write_unlock_bh(&ls->ls_lkbxa_lock);
1587 
1588 		detach_lkb(lkb);
1589 
1590 		/* for local/process lkbs, lvbptr points to caller's lksb */
1591 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
1592 			dlm_free_lvb(lkb->lkb_lvbptr);
1593 		dlm_free_lkb(lkb);
1594 	}
1595 
1596 	return rv;
1597 }
1598 
dlm_put_lkb(struct dlm_lkb * lkb)1599 int dlm_put_lkb(struct dlm_lkb *lkb)
1600 {
1601 	struct dlm_ls *ls;
1602 
1603 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1604 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1605 
1606 	ls = lkb->lkb_resource->res_ls;
1607 	return __put_lkb(ls, lkb);
1608 }
1609 
1610 /* This is only called to add a reference when the code already holds
1611    a valid reference to the lkb, so there's no need for locking. */
1612 
hold_lkb(struct dlm_lkb * lkb)1613 static inline void hold_lkb(struct dlm_lkb *lkb)
1614 {
1615 	kref_get(&lkb->lkb_ref);
1616 }
1617 
unhold_lkb_assert(struct kref * kref)1618 static void unhold_lkb_assert(struct kref *kref)
1619 {
1620 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1621 
1622 	DLM_ASSERT(false, dlm_print_lkb(lkb););
1623 }
1624 
1625 /* This is called when we need to remove a reference and are certain
1626    it's not the last ref.  e.g. del_lkb is always called between a
1627    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1628    put_lkb would work fine, but would involve unnecessary locking */
1629 
unhold_lkb(struct dlm_lkb * lkb)1630 static inline void unhold_lkb(struct dlm_lkb *lkb)
1631 {
1632 	kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1633 }
1634 
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1635 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1636 			    int mode)
1637 {
1638 	struct dlm_lkb *lkb = NULL, *iter;
1639 
1640 	list_for_each_entry(iter, head, lkb_statequeue)
1641 		if (iter->lkb_rqmode < mode) {
1642 			lkb = iter;
1643 			list_add_tail(new, &iter->lkb_statequeue);
1644 			break;
1645 		}
1646 
1647 	if (!lkb)
1648 		list_add_tail(new, head);
1649 }
1650 
1651 /* add/remove lkb to rsb's grant/convert/wait queue */
1652 
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1653 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1654 {
1655 	kref_get(&lkb->lkb_ref);
1656 
1657 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1658 
1659 	lkb->lkb_timestamp = ktime_get();
1660 
1661 	lkb->lkb_status = status;
1662 
1663 	switch (status) {
1664 	case DLM_LKSTS_WAITING:
1665 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1666 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1667 		else
1668 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1669 		break;
1670 	case DLM_LKSTS_GRANTED:
1671 		/* convention says granted locks kept in order of grmode */
1672 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1673 				lkb->lkb_grmode);
1674 		break;
1675 	case DLM_LKSTS_CONVERT:
1676 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1677 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1678 		else
1679 			list_add_tail(&lkb->lkb_statequeue,
1680 				      &r->res_convertqueue);
1681 		break;
1682 	default:
1683 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1684 	}
1685 }
1686 
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1687 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1688 {
1689 	lkb->lkb_status = 0;
1690 	list_del(&lkb->lkb_statequeue);
1691 	unhold_lkb(lkb);
1692 }
1693 
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1694 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1695 {
1696 	del_lkb(r, lkb);
1697 	add_lkb(r, lkb, sts);
1698 }
1699 
msg_reply_type(int mstype)1700 static int msg_reply_type(int mstype)
1701 {
1702 	switch (mstype) {
1703 	case DLM_MSG_REQUEST:
1704 		return DLM_MSG_REQUEST_REPLY;
1705 	case DLM_MSG_CONVERT:
1706 		return DLM_MSG_CONVERT_REPLY;
1707 	case DLM_MSG_UNLOCK:
1708 		return DLM_MSG_UNLOCK_REPLY;
1709 	case DLM_MSG_CANCEL:
1710 		return DLM_MSG_CANCEL_REPLY;
1711 	case DLM_MSG_LOOKUP:
1712 		return DLM_MSG_LOOKUP_REPLY;
1713 	}
1714 	return -1;
1715 }
1716 
1717 /* add/remove lkb from global waiters list of lkb's waiting for
1718    a reply from a remote node */
1719 
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1720 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1721 {
1722 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1723 
1724 	spin_lock_bh(&ls->ls_waiters_lock);
1725 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1726 		switch (mstype) {
1727 		case DLM_MSG_UNLOCK:
1728 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1729 			break;
1730 		case DLM_MSG_CANCEL:
1731 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1732 			break;
1733 		default:
1734 			/* should never happen as validate_lock_args() checks
1735 			 * on lkb_wait_type and validate_unlock_args() only
1736 			 * creates UNLOCK or CANCEL messages.
1737 			 */
1738 			WARN_ON_ONCE(1);
1739 			goto out;
1740 		}
1741 		lkb->lkb_wait_count++;
1742 		hold_lkb(lkb);
1743 
1744 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1745 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
1746 			  lkb->lkb_wait_count, dlm_iflags_val(lkb));
1747 		goto out;
1748 	}
1749 
1750 	DLM_ASSERT(!lkb->lkb_wait_count,
1751 		   dlm_print_lkb(lkb);
1752 		   printk("wait_count %d\n", lkb->lkb_wait_count););
1753 
1754 	lkb->lkb_wait_count++;
1755 	lkb->lkb_wait_type = mstype;
1756 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1757 	hold_lkb(lkb);
1758 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1759  out:
1760 	spin_unlock_bh(&ls->ls_waiters_lock);
1761 }
1762 
1763 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1764    list as part of process_requestqueue (e.g. a lookup that has an optimized
1765    request reply on the requestqueue) between dlm_recover_waiters_pre() which
1766    set RESEND and dlm_recover_waiters_post() */
1767 
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1768 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1769 				const struct dlm_message *ms)
1770 {
1771 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1772 	int overlap_done = 0;
1773 
1774 	if (mstype == DLM_MSG_UNLOCK_REPLY &&
1775 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1776 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1777 		overlap_done = 1;
1778 		goto out_del;
1779 	}
1780 
1781 	if (mstype == DLM_MSG_CANCEL_REPLY &&
1782 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1783 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1784 		overlap_done = 1;
1785 		goto out_del;
1786 	}
1787 
1788 	/* Cancel state was preemptively cleared by a successful convert,
1789 	   see next comment, nothing to do. */
1790 
1791 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1792 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1793 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
1794 			  lkb->lkb_id, lkb->lkb_wait_type);
1795 		return -1;
1796 	}
1797 
1798 	/* Remove for the convert reply, and premptively remove for the
1799 	   cancel reply.  A convert has been granted while there's still
1800 	   an outstanding cancel on it (the cancel is moot and the result
1801 	   in the cancel reply should be 0).  We preempt the cancel reply
1802 	   because the app gets the convert result and then can follow up
1803 	   with another op, like convert.  This subsequent op would see the
1804 	   lingering state of the cancel and fail with -EBUSY. */
1805 
1806 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1807 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1808 	    test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1809 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1810 			  lkb->lkb_id);
1811 		lkb->lkb_wait_type = 0;
1812 		lkb->lkb_wait_count--;
1813 		unhold_lkb(lkb);
1814 		goto out_del;
1815 	}
1816 
1817 	/* N.B. type of reply may not always correspond to type of original
1818 	   msg due to lookup->request optimization, verify others? */
1819 
1820 	if (lkb->lkb_wait_type) {
1821 		lkb->lkb_wait_type = 0;
1822 		goto out_del;
1823 	}
1824 
1825 	log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1826 		  lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1827 		  lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1828 	return -1;
1829 
1830  out_del:
1831 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1832 	   to the op that was in progress prior to the unlock/cancel; we
1833 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1834 	   this would happen */
1835 
1836 	if (overlap_done && lkb->lkb_wait_type) {
1837 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1838 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1839 		lkb->lkb_wait_count--;
1840 		unhold_lkb(lkb);
1841 		lkb->lkb_wait_type = 0;
1842 	}
1843 
1844 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1845 
1846 	clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1847 	lkb->lkb_wait_count--;
1848 	if (!lkb->lkb_wait_count)
1849 		list_del_init(&lkb->lkb_wait_reply);
1850 	unhold_lkb(lkb);
1851 	return 0;
1852 }
1853 
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1854 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1855 {
1856 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1857 	int error;
1858 
1859 	spin_lock_bh(&ls->ls_waiters_lock);
1860 	error = _remove_from_waiters(lkb, mstype, NULL);
1861 	spin_unlock_bh(&ls->ls_waiters_lock);
1862 	return error;
1863 }
1864 
1865 /* Handles situations where we might be processing a "fake" or "local" reply in
1866  * the recovery context which stops any locking activity. Only debugfs might
1867  * change the lockspace waiters but they will held the recovery lock to ensure
1868  * remove_from_waiters_ms() in local case will be the only user manipulating the
1869  * lockspace waiters in recovery context.
1870  */
1871 
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1872 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1873 				  const struct dlm_message *ms, bool local)
1874 {
1875 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1876 	int error;
1877 
1878 	if (!local)
1879 		spin_lock_bh(&ls->ls_waiters_lock);
1880 	else
1881 		WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1882 			     !dlm_locking_stopped(ls));
1883 	error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1884 	if (!local)
1885 		spin_unlock_bh(&ls->ls_waiters_lock);
1886 	return error;
1887 }
1888 
1889 /* lkb is master or local copy */
1890 
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1891 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1892 {
1893 	int b, len = r->res_ls->ls_lvblen;
1894 
1895 	/* b=1 lvb returned to caller
1896 	   b=0 lvb written to rsb or invalidated
1897 	   b=-1 do nothing */
1898 
1899 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1900 
1901 	if (b == 1) {
1902 		if (!lkb->lkb_lvbptr)
1903 			return;
1904 
1905 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1906 			return;
1907 
1908 		if (!r->res_lvbptr)
1909 			return;
1910 
1911 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1912 		lkb->lkb_lvbseq = r->res_lvbseq;
1913 
1914 	} else if (b == 0) {
1915 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1916 			rsb_set_flag(r, RSB_VALNOTVALID);
1917 			return;
1918 		}
1919 
1920 		if (!lkb->lkb_lvbptr)
1921 			return;
1922 
1923 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1924 			return;
1925 
1926 		if (!r->res_lvbptr)
1927 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1928 
1929 		if (!r->res_lvbptr)
1930 			return;
1931 
1932 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1933 		r->res_lvbseq++;
1934 		lkb->lkb_lvbseq = r->res_lvbseq;
1935 		rsb_clear_flag(r, RSB_VALNOTVALID);
1936 	}
1937 
1938 	if (rsb_flag(r, RSB_VALNOTVALID))
1939 		set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1940 }
1941 
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1942 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1943 {
1944 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1945 		return;
1946 
1947 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1948 		rsb_set_flag(r, RSB_VALNOTVALID);
1949 		return;
1950 	}
1951 
1952 	if (!lkb->lkb_lvbptr)
1953 		return;
1954 
1955 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1956 		return;
1957 
1958 	if (!r->res_lvbptr)
1959 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1960 
1961 	if (!r->res_lvbptr)
1962 		return;
1963 
1964 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1965 	r->res_lvbseq++;
1966 	rsb_clear_flag(r, RSB_VALNOTVALID);
1967 }
1968 
1969 /* lkb is process copy (pc) */
1970 
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1971 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1972 			    const struct dlm_message *ms)
1973 {
1974 	int b;
1975 
1976 	if (!lkb->lkb_lvbptr)
1977 		return;
1978 
1979 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1980 		return;
1981 
1982 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1983 	if (b == 1) {
1984 		int len = receive_extralen(ms);
1985 		if (len > r->res_ls->ls_lvblen)
1986 			len = r->res_ls->ls_lvblen;
1987 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1988 		lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1989 	}
1990 }
1991 
1992 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1993    remove_lock -- used for unlock, removes lkb from granted
1994    revert_lock -- used for cancel, moves lkb from convert to granted
1995    grant_lock  -- used for request and convert, adds lkb to granted or
1996                   moves lkb from convert or waiting to granted
1997 
1998    Each of these is used for master or local copy lkb's.  There is
1999    also a _pc() variation used to make the corresponding change on
2000    a process copy (pc) lkb. */
2001 
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2002 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2003 {
2004 	del_lkb(r, lkb);
2005 	lkb->lkb_grmode = DLM_LOCK_IV;
2006 	/* this unhold undoes the original ref from create_lkb()
2007 	   so this leads to the lkb being freed */
2008 	unhold_lkb(lkb);
2009 }
2010 
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2011 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2012 {
2013 	set_lvb_unlock(r, lkb);
2014 	_remove_lock(r, lkb);
2015 }
2016 
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2017 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2018 {
2019 	_remove_lock(r, lkb);
2020 }
2021 
2022 /* returns: 0 did nothing
2023 	    1 moved lock to granted
2024 	   -1 removed lock */
2025 
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2026 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2027 {
2028 	int rv = 0;
2029 
2030 	lkb->lkb_rqmode = DLM_LOCK_IV;
2031 
2032 	switch (lkb->lkb_status) {
2033 	case DLM_LKSTS_GRANTED:
2034 		break;
2035 	case DLM_LKSTS_CONVERT:
2036 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2037 		rv = 1;
2038 		break;
2039 	case DLM_LKSTS_WAITING:
2040 		del_lkb(r, lkb);
2041 		lkb->lkb_grmode = DLM_LOCK_IV;
2042 		/* this unhold undoes the original ref from create_lkb()
2043 		   so this leads to the lkb being freed */
2044 		unhold_lkb(lkb);
2045 		rv = -1;
2046 		break;
2047 	default:
2048 		log_print("invalid status for revert %d", lkb->lkb_status);
2049 	}
2050 	return rv;
2051 }
2052 
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2053 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2054 {
2055 	return revert_lock(r, lkb);
2056 }
2057 
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2058 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2061 		lkb->lkb_grmode = lkb->lkb_rqmode;
2062 		if (lkb->lkb_status)
2063 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2064 		else
2065 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2066 	}
2067 
2068 	lkb->lkb_rqmode = DLM_LOCK_IV;
2069 	lkb->lkb_highbast = 0;
2070 }
2071 
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2072 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2073 {
2074 	set_lvb_lock(r, lkb);
2075 	_grant_lock(r, lkb);
2076 }
2077 
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2078 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2079 			  const struct dlm_message *ms)
2080 {
2081 	set_lvb_lock_pc(r, lkb, ms);
2082 	_grant_lock(r, lkb);
2083 }
2084 
2085 /* called by grant_pending_locks() which means an async grant message must
2086    be sent to the requesting node in addition to granting the lock if the
2087    lkb belongs to a remote node. */
2088 
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2089 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2090 {
2091 	grant_lock(r, lkb);
2092 	if (is_master_copy(lkb))
2093 		send_grant(r, lkb);
2094 	else
2095 		queue_cast(r, lkb, 0);
2096 }
2097 
2098 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2099    change the granted/requested modes.  We're munging things accordingly in
2100    the process copy.
2101    CONVDEADLK: our grmode may have been forced down to NL to resolve a
2102    conversion deadlock
2103    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2104    compatible with other granted locks */
2105 
munge_demoted(struct dlm_lkb * lkb)2106 static void munge_demoted(struct dlm_lkb *lkb)
2107 {
2108 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2109 		log_print("munge_demoted %x invalid modes gr %d rq %d",
2110 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2111 		return;
2112 	}
2113 
2114 	lkb->lkb_grmode = DLM_LOCK_NL;
2115 }
2116 
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2117 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2118 {
2119 	if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2120 	    ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2121 		log_print("munge_altmode %x invalid reply type %d",
2122 			  lkb->lkb_id, le32_to_cpu(ms->m_type));
2123 		return;
2124 	}
2125 
2126 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2127 		lkb->lkb_rqmode = DLM_LOCK_PR;
2128 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2129 		lkb->lkb_rqmode = DLM_LOCK_CW;
2130 	else {
2131 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2132 		dlm_print_lkb(lkb);
2133 	}
2134 }
2135 
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2136 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2137 {
2138 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2139 					   lkb_statequeue);
2140 	if (lkb->lkb_id == first->lkb_id)
2141 		return 1;
2142 
2143 	return 0;
2144 }
2145 
2146 /* Check if the given lkb conflicts with another lkb on the queue. */
2147 
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2148 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2149 {
2150 	struct dlm_lkb *this;
2151 
2152 	list_for_each_entry(this, head, lkb_statequeue) {
2153 		if (this == lkb)
2154 			continue;
2155 		if (!modes_compat(this, lkb))
2156 			return 1;
2157 	}
2158 	return 0;
2159 }
2160 
2161 /*
2162  * "A conversion deadlock arises with a pair of lock requests in the converting
2163  * queue for one resource.  The granted mode of each lock blocks the requested
2164  * mode of the other lock."
2165  *
2166  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2167  * convert queue from being granted, then deadlk/demote lkb.
2168  *
2169  * Example:
2170  * Granted Queue: empty
2171  * Convert Queue: NL->EX (first lock)
2172  *                PR->EX (second lock)
2173  *
2174  * The first lock can't be granted because of the granted mode of the second
2175  * lock and the second lock can't be granted because it's not first in the
2176  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2177  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2178  * flag set and return DEMOTED in the lksb flags.
2179  *
2180  * Originally, this function detected conv-deadlk in a more limited scope:
2181  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2182  * - if lkb1 was the first entry in the queue (not just earlier), and was
2183  *   blocked by the granted mode of lkb2, and there was nothing on the
2184  *   granted queue preventing lkb1 from being granted immediately, i.e.
2185  *   lkb2 was the only thing preventing lkb1 from being granted.
2186  *
2187  * That second condition meant we'd only say there was conv-deadlk if
2188  * resolving it (by demotion) would lead to the first lock on the convert
2189  * queue being granted right away.  It allowed conversion deadlocks to exist
2190  * between locks on the convert queue while they couldn't be granted anyway.
2191  *
2192  * Now, we detect and take action on conversion deadlocks immediately when
2193  * they're created, even if they may not be immediately consequential.  If
2194  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2195  * mode that would prevent lkb1's conversion from being granted, we do a
2196  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2197  * I think this means that the lkb_is_ahead condition below should always
2198  * be zero, i.e. there will never be conv-deadlk between two locks that are
2199  * both already on the convert queue.
2200  */
2201 
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2202 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2203 {
2204 	struct dlm_lkb *lkb1;
2205 	int lkb_is_ahead = 0;
2206 
2207 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2208 		if (lkb1 == lkb2) {
2209 			lkb_is_ahead = 1;
2210 			continue;
2211 		}
2212 
2213 		if (!lkb_is_ahead) {
2214 			if (!modes_compat(lkb2, lkb1))
2215 				return 1;
2216 		} else {
2217 			if (!modes_compat(lkb2, lkb1) &&
2218 			    !modes_compat(lkb1, lkb2))
2219 				return 1;
2220 		}
2221 	}
2222 	return 0;
2223 }
2224 
2225 /*
2226  * Return 1 if the lock can be granted, 0 otherwise.
2227  * Also detect and resolve conversion deadlocks.
2228  *
2229  * lkb is the lock to be granted
2230  *
2231  * now is 1 if the function is being called in the context of the
2232  * immediate request, it is 0 if called later, after the lock has been
2233  * queued.
2234  *
2235  * recover is 1 if dlm_recover_grant() is trying to grant conversions
2236  * after recovery.
2237  *
2238  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2239  */
2240 
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2241 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2242 			   int recover)
2243 {
2244 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2245 
2246 	/*
2247 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2248 	 * a new request for a NL mode lock being blocked.
2249 	 *
2250 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2251 	 * request, then it would be granted.  In essence, the use of this flag
2252 	 * tells the Lock Manager to expedite theis request by not considering
2253 	 * what may be in the CONVERTING or WAITING queues...  As of this
2254 	 * writing, the EXPEDITE flag can be used only with new requests for NL
2255 	 * mode locks.  This flag is not valid for conversion requests.
2256 	 *
2257 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
2258 	 * conversion or used with a non-NL requested mode.  We also know an
2259 	 * EXPEDITE request is always granted immediately, so now must always
2260 	 * be 1.  The full condition to grant an expedite request: (now &&
2261 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2262 	 * therefore be shortened to just checking the flag.
2263 	 */
2264 
2265 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2266 		return 1;
2267 
2268 	/*
2269 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2270 	 * added to the remaining conditions.
2271 	 */
2272 
2273 	if (queue_conflict(&r->res_grantqueue, lkb))
2274 		return 0;
2275 
2276 	/*
2277 	 * 6-3: By default, a conversion request is immediately granted if the
2278 	 * requested mode is compatible with the modes of all other granted
2279 	 * locks
2280 	 */
2281 
2282 	if (queue_conflict(&r->res_convertqueue, lkb))
2283 		return 0;
2284 
2285 	/*
2286 	 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2287 	 * locks for a recovered rsb, on which lkb's have been rebuilt.
2288 	 * The lkb's may have been rebuilt on the queues in a different
2289 	 * order than they were in on the previous master.  So, granting
2290 	 * queued conversions in order after recovery doesn't make sense
2291 	 * since the order hasn't been preserved anyway.  The new order
2292 	 * could also have created a new "in place" conversion deadlock.
2293 	 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2294 	 * After recovery, there would be no granted locks, and possibly
2295 	 * NL->EX, PR->EX, an in-place conversion deadlock.)  So, after
2296 	 * recovery, grant conversions without considering order.
2297 	 */
2298 
2299 	if (conv && recover)
2300 		return 1;
2301 
2302 	/*
2303 	 * 6-5: But the default algorithm for deciding whether to grant or
2304 	 * queue conversion requests does not by itself guarantee that such
2305 	 * requests are serviced on a "first come first serve" basis.  This, in
2306 	 * turn, can lead to a phenomenon known as "indefinate postponement".
2307 	 *
2308 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2309 	 * the system service employed to request a lock conversion.  This flag
2310 	 * forces certain conversion requests to be queued, even if they are
2311 	 * compatible with the granted modes of other locks on the same
2312 	 * resource.  Thus, the use of this flag results in conversion requests
2313 	 * being ordered on a "first come first servce" basis.
2314 	 *
2315 	 * DCT: This condition is all about new conversions being able to occur
2316 	 * "in place" while the lock remains on the granted queue (assuming
2317 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
2318 	 * doesn't _have_ to go onto the convert queue where it's processed in
2319 	 * order.  The "now" variable is necessary to distinguish converts
2320 	 * being received and processed for the first time now, because once a
2321 	 * convert is moved to the conversion queue the condition below applies
2322 	 * requiring fifo granting.
2323 	 */
2324 
2325 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2326 		return 1;
2327 
2328 	/*
2329 	 * Even if the convert is compat with all granted locks,
2330 	 * QUECVT forces it behind other locks on the convert queue.
2331 	 */
2332 
2333 	if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2334 		if (list_empty(&r->res_convertqueue))
2335 			return 1;
2336 		else
2337 			return 0;
2338 	}
2339 
2340 	/*
2341 	 * The NOORDER flag is set to avoid the standard vms rules on grant
2342 	 * order.
2343 	 */
2344 
2345 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2346 		return 1;
2347 
2348 	/*
2349 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2350 	 * granted until all other conversion requests ahead of it are granted
2351 	 * and/or canceled.
2352 	 */
2353 
2354 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2355 		return 1;
2356 
2357 	/*
2358 	 * 6-4: By default, a new request is immediately granted only if all
2359 	 * three of the following conditions are satisfied when the request is
2360 	 * issued:
2361 	 * - The queue of ungranted conversion requests for the resource is
2362 	 *   empty.
2363 	 * - The queue of ungranted new requests for the resource is empty.
2364 	 * - The mode of the new request is compatible with the most
2365 	 *   restrictive mode of all granted locks on the resource.
2366 	 */
2367 
2368 	if (now && !conv && list_empty(&r->res_convertqueue) &&
2369 	    list_empty(&r->res_waitqueue))
2370 		return 1;
2371 
2372 	/*
2373 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
2374 	 * it cannot be granted until the queue of ungranted conversion
2375 	 * requests is empty, all ungranted new requests ahead of it are
2376 	 * granted and/or canceled, and it is compatible with the granted mode
2377 	 * of the most restrictive lock granted on the resource.
2378 	 */
2379 
2380 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
2381 	    first_in_list(lkb, &r->res_waitqueue))
2382 		return 1;
2383 
2384 	return 0;
2385 }
2386 
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2387 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2388 			  int recover, int *err)
2389 {
2390 	int rv;
2391 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2392 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2393 
2394 	if (err)
2395 		*err = 0;
2396 
2397 	rv = _can_be_granted(r, lkb, now, recover);
2398 	if (rv)
2399 		goto out;
2400 
2401 	/*
2402 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2403 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2404 	 * cancels one of the locks.
2405 	 */
2406 
2407 	if (is_convert && can_be_queued(lkb) &&
2408 	    conversion_deadlock_detect(r, lkb)) {
2409 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2410 			lkb->lkb_grmode = DLM_LOCK_NL;
2411 			set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2412 		} else if (err) {
2413 			*err = -EDEADLK;
2414 		} else {
2415 			log_print("can_be_granted deadlock %x now %d",
2416 				  lkb->lkb_id, now);
2417 			dlm_dump_rsb(r);
2418 		}
2419 		goto out;
2420 	}
2421 
2422 	/*
2423 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2424 	 * to grant a request in a mode other than the normal rqmode.  It's a
2425 	 * simple way to provide a big optimization to applications that can
2426 	 * use them.
2427 	 */
2428 
2429 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2430 		alt = DLM_LOCK_PR;
2431 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2432 		alt = DLM_LOCK_CW;
2433 
2434 	if (alt) {
2435 		lkb->lkb_rqmode = alt;
2436 		rv = _can_be_granted(r, lkb, now, 0);
2437 		if (rv)
2438 			set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2439 		else
2440 			lkb->lkb_rqmode = rqmode;
2441 	}
2442  out:
2443 	return rv;
2444 }
2445 
2446 /* Returns the highest requested mode of all blocked conversions; sets
2447    cw if there's a blocked conversion to DLM_LOCK_CW. */
2448 
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2449 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2450 				 unsigned int *count)
2451 {
2452 	struct dlm_lkb *lkb, *s;
2453 	int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2454 	int hi, demoted, quit, grant_restart, demote_restart;
2455 	int deadlk;
2456 
2457 	quit = 0;
2458  restart:
2459 	grant_restart = 0;
2460 	demote_restart = 0;
2461 	hi = DLM_LOCK_IV;
2462 
2463 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2464 		demoted = is_demoted(lkb);
2465 		deadlk = 0;
2466 
2467 		if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2468 			grant_lock_pending(r, lkb);
2469 			grant_restart = 1;
2470 			if (count)
2471 				(*count)++;
2472 			continue;
2473 		}
2474 
2475 		if (!demoted && is_demoted(lkb)) {
2476 			log_print("WARN: pending demoted %x node %d %s",
2477 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2478 			demote_restart = 1;
2479 			continue;
2480 		}
2481 
2482 		if (deadlk) {
2483 			/*
2484 			 * If DLM_LKB_NODLKWT flag is set and conversion
2485 			 * deadlock is detected, we request blocking AST and
2486 			 * down (or cancel) conversion.
2487 			 */
2488 			if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2489 				if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2490 					queue_bast(r, lkb, lkb->lkb_rqmode);
2491 					lkb->lkb_highbast = lkb->lkb_rqmode;
2492 				}
2493 			} else {
2494 				log_print("WARN: pending deadlock %x node %d %s",
2495 					  lkb->lkb_id, lkb->lkb_nodeid,
2496 					  r->res_name);
2497 				dlm_dump_rsb(r);
2498 			}
2499 			continue;
2500 		}
2501 
2502 		hi = max_t(int, lkb->lkb_rqmode, hi);
2503 
2504 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2505 			*cw = 1;
2506 	}
2507 
2508 	if (grant_restart)
2509 		goto restart;
2510 	if (demote_restart && !quit) {
2511 		quit = 1;
2512 		goto restart;
2513 	}
2514 
2515 	return max_t(int, high, hi);
2516 }
2517 
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2518 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2519 			      unsigned int *count)
2520 {
2521 	struct dlm_lkb *lkb, *s;
2522 
2523 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2524 		if (can_be_granted(r, lkb, 0, 0, NULL)) {
2525 			grant_lock_pending(r, lkb);
2526 			if (count)
2527 				(*count)++;
2528 		} else {
2529 			high = max_t(int, lkb->lkb_rqmode, high);
2530 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
2531 				*cw = 1;
2532 		}
2533 	}
2534 
2535 	return high;
2536 }
2537 
2538 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2539    on either the convert or waiting queue.
2540    high is the largest rqmode of all locks blocked on the convert or
2541    waiting queue. */
2542 
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2543 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2544 {
2545 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2546 		if (gr->lkb_highbast < DLM_LOCK_EX)
2547 			return 1;
2548 		return 0;
2549 	}
2550 
2551 	if (gr->lkb_highbast < high &&
2552 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2553 		return 1;
2554 	return 0;
2555 }
2556 
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2557 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2558 {
2559 	struct dlm_lkb *lkb, *s;
2560 	int high = DLM_LOCK_IV;
2561 	int cw = 0;
2562 
2563 	if (!is_master(r)) {
2564 		log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2565 		dlm_dump_rsb(r);
2566 		return;
2567 	}
2568 
2569 	high = grant_pending_convert(r, high, &cw, count);
2570 	high = grant_pending_wait(r, high, &cw, count);
2571 
2572 	if (high == DLM_LOCK_IV)
2573 		return;
2574 
2575 	/*
2576 	 * If there are locks left on the wait/convert queue then send blocking
2577 	 * ASTs to granted locks based on the largest requested mode (high)
2578 	 * found above.
2579 	 */
2580 
2581 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2582 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2583 			if (cw && high == DLM_LOCK_PR &&
2584 			    lkb->lkb_grmode == DLM_LOCK_PR)
2585 				queue_bast(r, lkb, DLM_LOCK_CW);
2586 			else
2587 				queue_bast(r, lkb, high);
2588 			lkb->lkb_highbast = high;
2589 		}
2590 	}
2591 }
2592 
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2593 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2594 {
2595 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2596 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2597 		if (gr->lkb_highbast < DLM_LOCK_EX)
2598 			return 1;
2599 		return 0;
2600 	}
2601 
2602 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2603 		return 1;
2604 	return 0;
2605 }
2606 
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2607 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2608 			    struct dlm_lkb *lkb)
2609 {
2610 	struct dlm_lkb *gr;
2611 
2612 	list_for_each_entry(gr, head, lkb_statequeue) {
2613 		/* skip self when sending basts to convertqueue */
2614 		if (gr == lkb)
2615 			continue;
2616 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2617 			queue_bast(r, gr, lkb->lkb_rqmode);
2618 			gr->lkb_highbast = lkb->lkb_rqmode;
2619 		}
2620 	}
2621 }
2622 
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2623 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2624 {
2625 	send_bast_queue(r, &r->res_grantqueue, lkb);
2626 }
2627 
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2628 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2629 {
2630 	send_bast_queue(r, &r->res_grantqueue, lkb);
2631 	send_bast_queue(r, &r->res_convertqueue, lkb);
2632 }
2633 
2634 /* set_master(r, lkb) -- set the master nodeid of a resource
2635 
2636    The purpose of this function is to set the nodeid field in the given
2637    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
2638    known, it can just be copied to the lkb and the function will return
2639    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
2640    before it can be copied to the lkb.
2641 
2642    When the rsb nodeid is being looked up remotely, the initial lkb
2643    causing the lookup is kept on the ls_waiters list waiting for the
2644    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
2645    on the rsb's res_lookup list until the master is verified.
2646 
2647    Return values:
2648    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2649    1: the rsb master is not available and the lkb has been placed on
2650       a wait queue
2651 */
2652 
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2653 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2654 {
2655 	int our_nodeid = dlm_our_nodeid();
2656 
2657 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2658 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2659 		r->res_first_lkid = lkb->lkb_id;
2660 		lkb->lkb_nodeid = r->res_nodeid;
2661 		return 0;
2662 	}
2663 
2664 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2665 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2666 		return 1;
2667 	}
2668 
2669 	if (r->res_master_nodeid == our_nodeid) {
2670 		lkb->lkb_nodeid = 0;
2671 		return 0;
2672 	}
2673 
2674 	if (r->res_master_nodeid) {
2675 		lkb->lkb_nodeid = r->res_master_nodeid;
2676 		return 0;
2677 	}
2678 
2679 	if (dlm_dir_nodeid(r) == our_nodeid) {
2680 		/* This is a somewhat unusual case; find_rsb will usually
2681 		   have set res_master_nodeid when dir nodeid is local, but
2682 		   there are cases where we become the dir node after we've
2683 		   past find_rsb and go through _request_lock again.
2684 		   confirm_master() or process_lookup_list() needs to be
2685 		   called after this. */
2686 		log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2687 			  lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2688 			  r->res_name);
2689 		r->res_master_nodeid = our_nodeid;
2690 		r->res_nodeid = 0;
2691 		lkb->lkb_nodeid = 0;
2692 		return 0;
2693 	}
2694 
2695 	r->res_first_lkid = lkb->lkb_id;
2696 	send_lookup(r, lkb);
2697 	return 1;
2698 }
2699 
process_lookup_list(struct dlm_rsb * r)2700 static void process_lookup_list(struct dlm_rsb *r)
2701 {
2702 	struct dlm_lkb *lkb, *safe;
2703 
2704 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2705 		list_del_init(&lkb->lkb_rsb_lookup);
2706 		_request_lock(r, lkb);
2707 	}
2708 }
2709 
2710 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2711 
confirm_master(struct dlm_rsb * r,int error)2712 static void confirm_master(struct dlm_rsb *r, int error)
2713 {
2714 	struct dlm_lkb *lkb;
2715 
2716 	if (!r->res_first_lkid)
2717 		return;
2718 
2719 	switch (error) {
2720 	case 0:
2721 	case -EINPROGRESS:
2722 		r->res_first_lkid = 0;
2723 		process_lookup_list(r);
2724 		break;
2725 
2726 	case -EAGAIN:
2727 	case -EBADR:
2728 	case -ENOTBLK:
2729 		/* the remote request failed and won't be retried (it was
2730 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2731 		   lkb the first_lkid */
2732 
2733 		r->res_first_lkid = 0;
2734 
2735 		if (!list_empty(&r->res_lookup)) {
2736 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2737 					 lkb_rsb_lookup);
2738 			list_del_init(&lkb->lkb_rsb_lookup);
2739 			r->res_first_lkid = lkb->lkb_id;
2740 			_request_lock(r, lkb);
2741 		}
2742 		break;
2743 
2744 	default:
2745 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2746 	}
2747 }
2748 
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2749 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2750 			 int namelen, void (*ast)(void *astparam),
2751 			 void *astparam,
2752 			 void (*bast)(void *astparam, int mode),
2753 			 struct dlm_args *args)
2754 {
2755 	int rv = -EINVAL;
2756 
2757 	/* check for invalid arg usage */
2758 
2759 	if (mode < 0 || mode > DLM_LOCK_EX)
2760 		goto out;
2761 
2762 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2763 		goto out;
2764 
2765 	if (flags & DLM_LKF_CANCEL)
2766 		goto out;
2767 
2768 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2769 		goto out;
2770 
2771 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2772 		goto out;
2773 
2774 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2775 		goto out;
2776 
2777 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2778 		goto out;
2779 
2780 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2781 		goto out;
2782 
2783 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2784 		goto out;
2785 
2786 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2787 		goto out;
2788 
2789 	if (!ast || !lksb)
2790 		goto out;
2791 
2792 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2793 		goto out;
2794 
2795 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2796 		goto out;
2797 
2798 	/* these args will be copied to the lkb in validate_lock_args,
2799 	   it cannot be done now because when converting locks, fields in
2800 	   an active lkb cannot be modified before locking the rsb */
2801 
2802 	args->flags = flags;
2803 	args->astfn = ast;
2804 	args->astparam = astparam;
2805 	args->bastfn = bast;
2806 	args->mode = mode;
2807 	args->lksb = lksb;
2808 	rv = 0;
2809  out:
2810 	return rv;
2811 }
2812 
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2813 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2814 {
2815 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2816  		      DLM_LKF_FORCEUNLOCK))
2817 		return -EINVAL;
2818 
2819 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2820 		return -EINVAL;
2821 
2822 	args->flags = flags;
2823 	args->astparam = astarg;
2824 	return 0;
2825 }
2826 
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2827 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2828 			      struct dlm_args *args)
2829 {
2830 	int rv = -EBUSY;
2831 
2832 	if (args->flags & DLM_LKF_CONVERT) {
2833 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2834 			goto out;
2835 
2836 		/* lock not allowed if there's any op in progress */
2837 		if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2838 			goto out;
2839 
2840 		if (is_overlap(lkb))
2841 			goto out;
2842 
2843 		rv = -EINVAL;
2844 		if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2845 			goto out;
2846 
2847 		if (args->flags & DLM_LKF_QUECVT &&
2848 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2849 			goto out;
2850 	}
2851 
2852 	lkb->lkb_exflags = args->flags;
2853 	dlm_set_sbflags_val(lkb, 0);
2854 	lkb->lkb_astfn = args->astfn;
2855 	lkb->lkb_astparam = args->astparam;
2856 	lkb->lkb_bastfn = args->bastfn;
2857 	lkb->lkb_rqmode = args->mode;
2858 	lkb->lkb_lksb = args->lksb;
2859 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2860 	lkb->lkb_ownpid = (int) current->pid;
2861 	rv = 0;
2862  out:
2863 	switch (rv) {
2864 	case 0:
2865 		break;
2866 	case -EINVAL:
2867 		/* annoy the user because dlm usage is wrong */
2868 		WARN_ON(1);
2869 		log_error(ls, "%s %d %x %x %x %d %d", __func__,
2870 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2871 			  lkb->lkb_status, lkb->lkb_wait_type);
2872 		break;
2873 	default:
2874 		log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2875 			  rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2876 			  lkb->lkb_status, lkb->lkb_wait_type);
2877 		break;
2878 	}
2879 
2880 	return rv;
2881 }
2882 
2883 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2884    for success */
2885 
2886 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2887    because there may be a lookup in progress and it's valid to do
2888    cancel/unlockf on it */
2889 
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2890 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2891 {
2892 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2893 	int rv = -EBUSY;
2894 
2895 	/* normal unlock not allowed if there's any op in progress */
2896 	if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2897 	    (lkb->lkb_wait_type || lkb->lkb_wait_count))
2898 		goto out;
2899 
2900 	/* an lkb may be waiting for an rsb lookup to complete where the
2901 	   lookup was initiated by another lock */
2902 
2903 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2904 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2905 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2906 			list_del_init(&lkb->lkb_rsb_lookup);
2907 			queue_cast(lkb->lkb_resource, lkb,
2908 				   args->flags & DLM_LKF_CANCEL ?
2909 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2910 			unhold_lkb(lkb); /* undoes create_lkb() */
2911 		}
2912 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2913 		goto out;
2914 	}
2915 
2916 	rv = -EINVAL;
2917 	if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2918 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2919 		dlm_print_lkb(lkb);
2920 		goto out;
2921 	}
2922 
2923 	/* an lkb may still exist even though the lock is EOL'ed due to a
2924 	 * cancel, unlock or failed noqueue request; an app can't use these
2925 	 * locks; return same error as if the lkid had not been found at all
2926 	 */
2927 
2928 	if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2929 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2930 		rv = -ENOENT;
2931 		goto out;
2932 	}
2933 
2934 	if (is_overlap_unlock(lkb))
2935 		goto out;
2936 
2937 	/* cancel not allowed with another cancel/unlock in progress */
2938 
2939 	if (args->flags & DLM_LKF_CANCEL) {
2940 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2941 			goto out;
2942 
2943 		if (is_overlap_cancel(lkb))
2944 			goto out;
2945 
2946 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2947 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2948 			rv = -EBUSY;
2949 			goto out;
2950 		}
2951 
2952 		/* there's nothing to cancel */
2953 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2954 		    !lkb->lkb_wait_type) {
2955 			rv = -EBUSY;
2956 			goto out;
2957 		}
2958 
2959 		switch (lkb->lkb_wait_type) {
2960 		case DLM_MSG_LOOKUP:
2961 		case DLM_MSG_REQUEST:
2962 			set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2963 			rv = -EBUSY;
2964 			goto out;
2965 		case DLM_MSG_UNLOCK:
2966 		case DLM_MSG_CANCEL:
2967 			goto out;
2968 		}
2969 		/* add_to_waiters() will set OVERLAP_CANCEL */
2970 		goto out_ok;
2971 	}
2972 
2973 	/* do we need to allow a force-unlock if there's a normal unlock
2974 	   already in progress?  in what conditions could the normal unlock
2975 	   fail such that we'd want to send a force-unlock to be sure? */
2976 
2977 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2978 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2979 			goto out;
2980 
2981 		if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2982 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2983 			rv = -EBUSY;
2984 			goto out;
2985 		}
2986 
2987 		switch (lkb->lkb_wait_type) {
2988 		case DLM_MSG_LOOKUP:
2989 		case DLM_MSG_REQUEST:
2990 			set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2991 			rv = -EBUSY;
2992 			goto out;
2993 		case DLM_MSG_UNLOCK:
2994 			goto out;
2995 		}
2996 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2997 	}
2998 
2999  out_ok:
3000 	/* an overlapping op shouldn't blow away exflags from other op */
3001 	lkb->lkb_exflags |= args->flags;
3002 	dlm_set_sbflags_val(lkb, 0);
3003 	lkb->lkb_astparam = args->astparam;
3004 	rv = 0;
3005  out:
3006 	switch (rv) {
3007 	case 0:
3008 		break;
3009 	case -EINVAL:
3010 		/* annoy the user because dlm usage is wrong */
3011 		WARN_ON(1);
3012 		log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3013 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3014 			  args->flags, lkb->lkb_wait_type,
3015 			  lkb->lkb_resource->res_name);
3016 		break;
3017 	default:
3018 		log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3019 			  lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3020 			  args->flags, lkb->lkb_wait_type,
3021 			  lkb->lkb_resource->res_name);
3022 		break;
3023 	}
3024 
3025 	return rv;
3026 }
3027 
3028 /*
3029  * Four stage 4 varieties:
3030  * do_request(), do_convert(), do_unlock(), do_cancel()
3031  * These are called on the master node for the given lock and
3032  * from the central locking logic.
3033  */
3034 
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3035 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3036 {
3037 	int error = 0;
3038 
3039 	if (can_be_granted(r, lkb, 1, 0, NULL)) {
3040 		grant_lock(r, lkb);
3041 		queue_cast(r, lkb, 0);
3042 		goto out;
3043 	}
3044 
3045 	if (can_be_queued(lkb)) {
3046 		error = -EINPROGRESS;
3047 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
3048 		goto out;
3049 	}
3050 
3051 	error = -EAGAIN;
3052 	queue_cast(r, lkb, -EAGAIN);
3053  out:
3054 	return error;
3055 }
3056 
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3057 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3058 			       int error)
3059 {
3060 	switch (error) {
3061 	case -EAGAIN:
3062 		if (force_blocking_asts(lkb))
3063 			send_blocking_asts_all(r, lkb);
3064 		break;
3065 	case -EINPROGRESS:
3066 		send_blocking_asts(r, lkb);
3067 		break;
3068 	}
3069 }
3070 
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3071 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3072 {
3073 	int error = 0;
3074 	int deadlk = 0;
3075 
3076 	/* changing an existing lock may allow others to be granted */
3077 
3078 	if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3079 		grant_lock(r, lkb);
3080 		queue_cast(r, lkb, 0);
3081 		goto out;
3082 	}
3083 
3084 	/* can_be_granted() detected that this lock would block in a conversion
3085 	   deadlock, so we leave it on the granted queue and return EDEADLK in
3086 	   the ast for the convert. */
3087 
3088 	if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3089 		/* it's left on the granted queue */
3090 		revert_lock(r, lkb);
3091 		queue_cast(r, lkb, -EDEADLK);
3092 		error = -EDEADLK;
3093 		goto out;
3094 	}
3095 
3096 	/* is_demoted() means the can_be_granted() above set the grmode
3097 	   to NL, and left us on the granted queue.  This auto-demotion
3098 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
3099 	   now grantable.  We have to try to grant other converting locks
3100 	   before we try again to grant this one. */
3101 
3102 	if (is_demoted(lkb)) {
3103 		grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3104 		if (_can_be_granted(r, lkb, 1, 0)) {
3105 			grant_lock(r, lkb);
3106 			queue_cast(r, lkb, 0);
3107 			goto out;
3108 		}
3109 		/* else fall through and move to convert queue */
3110 	}
3111 
3112 	if (can_be_queued(lkb)) {
3113 		error = -EINPROGRESS;
3114 		del_lkb(r, lkb);
3115 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3116 		goto out;
3117 	}
3118 
3119 	error = -EAGAIN;
3120 	queue_cast(r, lkb, -EAGAIN);
3121  out:
3122 	return error;
3123 }
3124 
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3125 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3126 			       int error)
3127 {
3128 	switch (error) {
3129 	case 0:
3130 		grant_pending_locks(r, NULL);
3131 		/* grant_pending_locks also sends basts */
3132 		break;
3133 	case -EAGAIN:
3134 		if (force_blocking_asts(lkb))
3135 			send_blocking_asts_all(r, lkb);
3136 		break;
3137 	case -EINPROGRESS:
3138 		send_blocking_asts(r, lkb);
3139 		break;
3140 	}
3141 }
3142 
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3143 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3144 {
3145 	remove_lock(r, lkb);
3146 	queue_cast(r, lkb, -DLM_EUNLOCK);
3147 	return -DLM_EUNLOCK;
3148 }
3149 
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3150 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3151 			      int error)
3152 {
3153 	grant_pending_locks(r, NULL);
3154 }
3155 
3156 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3157 
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3158 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3159 {
3160 	int error;
3161 
3162 	error = revert_lock(r, lkb);
3163 	if (error) {
3164 		queue_cast(r, lkb, -DLM_ECANCEL);
3165 		return -DLM_ECANCEL;
3166 	}
3167 	return 0;
3168 }
3169 
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3170 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3171 			      int error)
3172 {
3173 	if (error)
3174 		grant_pending_locks(r, NULL);
3175 }
3176 
3177 /*
3178  * Four stage 3 varieties:
3179  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3180  */
3181 
3182 /* add a new lkb to a possibly new rsb, called by requesting process */
3183 
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3184 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3185 {
3186 	int error;
3187 
3188 	/* set_master: sets lkb nodeid from r */
3189 
3190 	error = set_master(r, lkb);
3191 	if (error < 0)
3192 		goto out;
3193 	if (error) {
3194 		error = 0;
3195 		goto out;
3196 	}
3197 
3198 	if (is_remote(r)) {
3199 		/* receive_request() calls do_request() on remote node */
3200 		error = send_request(r, lkb);
3201 	} else {
3202 		error = do_request(r, lkb);
3203 		/* for remote locks the request_reply is sent
3204 		   between do_request and do_request_effects */
3205 		do_request_effects(r, lkb, error);
3206 	}
3207  out:
3208 	return error;
3209 }
3210 
3211 /* change some property of an existing lkb, e.g. mode */
3212 
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3213 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3214 {
3215 	int error;
3216 
3217 	if (is_remote(r)) {
3218 		/* receive_convert() calls do_convert() on remote node */
3219 		error = send_convert(r, lkb);
3220 	} else {
3221 		error = do_convert(r, lkb);
3222 		/* for remote locks the convert_reply is sent
3223 		   between do_convert and do_convert_effects */
3224 		do_convert_effects(r, lkb, error);
3225 	}
3226 
3227 	return error;
3228 }
3229 
3230 /* remove an existing lkb from the granted queue */
3231 
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3232 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3233 {
3234 	int error;
3235 
3236 	if (is_remote(r)) {
3237 		/* receive_unlock() calls do_unlock() on remote node */
3238 		error = send_unlock(r, lkb);
3239 	} else {
3240 		error = do_unlock(r, lkb);
3241 		/* for remote locks the unlock_reply is sent
3242 		   between do_unlock and do_unlock_effects */
3243 		do_unlock_effects(r, lkb, error);
3244 	}
3245 
3246 	return error;
3247 }
3248 
3249 /* remove an existing lkb from the convert or wait queue */
3250 
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3251 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3252 {
3253 	int error;
3254 
3255 	if (is_remote(r)) {
3256 		/* receive_cancel() calls do_cancel() on remote node */
3257 		error = send_cancel(r, lkb);
3258 	} else {
3259 		error = do_cancel(r, lkb);
3260 		/* for remote locks the cancel_reply is sent
3261 		   between do_cancel and do_cancel_effects */
3262 		do_cancel_effects(r, lkb, error);
3263 	}
3264 
3265 	return error;
3266 }
3267 
3268 /*
3269  * Four stage 2 varieties:
3270  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3271  */
3272 
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3273 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3274 			const void *name, int len,
3275 			struct dlm_args *args)
3276 {
3277 	struct dlm_rsb *r;
3278 	int error;
3279 
3280 	error = validate_lock_args(ls, lkb, args);
3281 	if (error)
3282 		return error;
3283 
3284 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3285 	if (error)
3286 		return error;
3287 
3288 	lock_rsb(r);
3289 
3290 	attach_lkb(r, lkb);
3291 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3292 
3293 	error = _request_lock(r, lkb);
3294 
3295 	unlock_rsb(r);
3296 	put_rsb(r);
3297 	return error;
3298 }
3299 
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3300 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3301 			struct dlm_args *args)
3302 {
3303 	struct dlm_rsb *r;
3304 	int error;
3305 
3306 	r = lkb->lkb_resource;
3307 
3308 	hold_rsb(r);
3309 	lock_rsb(r);
3310 
3311 	error = validate_lock_args(ls, lkb, args);
3312 	if (error)
3313 		goto out;
3314 
3315 	error = _convert_lock(r, lkb);
3316  out:
3317 	unlock_rsb(r);
3318 	put_rsb(r);
3319 	return error;
3320 }
3321 
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3322 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3323 		       struct dlm_args *args)
3324 {
3325 	struct dlm_rsb *r;
3326 	int error;
3327 
3328 	r = lkb->lkb_resource;
3329 
3330 	hold_rsb(r);
3331 	lock_rsb(r);
3332 
3333 	error = validate_unlock_args(lkb, args);
3334 	if (error)
3335 		goto out;
3336 
3337 	error = _unlock_lock(r, lkb);
3338  out:
3339 	unlock_rsb(r);
3340 	put_rsb(r);
3341 	return error;
3342 }
3343 
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3344 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3345 		       struct dlm_args *args)
3346 {
3347 	struct dlm_rsb *r;
3348 	int error;
3349 
3350 	r = lkb->lkb_resource;
3351 
3352 	hold_rsb(r);
3353 	lock_rsb(r);
3354 
3355 	error = validate_unlock_args(lkb, args);
3356 	if (error)
3357 		goto out;
3358 
3359 	error = _cancel_lock(r, lkb);
3360  out:
3361 	unlock_rsb(r);
3362 	put_rsb(r);
3363 	return error;
3364 }
3365 
3366 /*
3367  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
3368  */
3369 
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3370 int dlm_lock(dlm_lockspace_t *lockspace,
3371 	     int mode,
3372 	     struct dlm_lksb *lksb,
3373 	     uint32_t flags,
3374 	     const void *name,
3375 	     unsigned int namelen,
3376 	     uint32_t parent_lkid,
3377 	     void (*ast) (void *astarg),
3378 	     void *astarg,
3379 	     void (*bast) (void *astarg, int mode))
3380 {
3381 	struct dlm_ls *ls;
3382 	struct dlm_lkb *lkb;
3383 	struct dlm_args args;
3384 	int error, convert = flags & DLM_LKF_CONVERT;
3385 
3386 	ls = dlm_find_lockspace_local(lockspace);
3387 	if (!ls)
3388 		return -EINVAL;
3389 
3390 	dlm_lock_recovery(ls);
3391 
3392 	if (convert)
3393 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
3394 	else
3395 		error = create_lkb(ls, &lkb);
3396 
3397 	if (error)
3398 		goto out;
3399 
3400 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3401 
3402 	error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3403 			      &args);
3404 	if (error)
3405 		goto out_put;
3406 
3407 	if (convert)
3408 		error = convert_lock(ls, lkb, &args);
3409 	else
3410 		error = request_lock(ls, lkb, name, namelen, &args);
3411 
3412 	if (error == -EINPROGRESS)
3413 		error = 0;
3414  out_put:
3415 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3416 
3417 	if (convert || error)
3418 		__put_lkb(ls, lkb);
3419 	if (error == -EAGAIN || error == -EDEADLK)
3420 		error = 0;
3421  out:
3422 	dlm_unlock_recovery(ls);
3423 	dlm_put_lockspace(ls);
3424 	return error;
3425 }
3426 
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3427 int dlm_unlock(dlm_lockspace_t *lockspace,
3428 	       uint32_t lkid,
3429 	       uint32_t flags,
3430 	       struct dlm_lksb *lksb,
3431 	       void *astarg)
3432 {
3433 	struct dlm_ls *ls;
3434 	struct dlm_lkb *lkb;
3435 	struct dlm_args args;
3436 	int error;
3437 
3438 	ls = dlm_find_lockspace_local(lockspace);
3439 	if (!ls)
3440 		return -EINVAL;
3441 
3442 	dlm_lock_recovery(ls);
3443 
3444 	error = find_lkb(ls, lkid, &lkb);
3445 	if (error)
3446 		goto out;
3447 
3448 	trace_dlm_unlock_start(ls, lkb, flags);
3449 
3450 	error = set_unlock_args(flags, astarg, &args);
3451 	if (error)
3452 		goto out_put;
3453 
3454 	if (flags & DLM_LKF_CANCEL)
3455 		error = cancel_lock(ls, lkb, &args);
3456 	else
3457 		error = unlock_lock(ls, lkb, &args);
3458 
3459 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3460 		error = 0;
3461 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3462 		error = 0;
3463  out_put:
3464 	trace_dlm_unlock_end(ls, lkb, flags, error);
3465 
3466 	dlm_put_lkb(lkb);
3467  out:
3468 	dlm_unlock_recovery(ls);
3469 	dlm_put_lockspace(ls);
3470 	return error;
3471 }
3472 
3473 /*
3474  * send/receive routines for remote operations and replies
3475  *
3476  * send_args
3477  * send_common
3478  * send_request			receive_request
3479  * send_convert			receive_convert
3480  * send_unlock			receive_unlock
3481  * send_cancel			receive_cancel
3482  * send_grant			receive_grant
3483  * send_bast			receive_bast
3484  * send_lookup			receive_lookup
3485  * send_remove			receive_remove
3486  *
3487  * 				send_common_reply
3488  * receive_request_reply	send_request_reply
3489  * receive_convert_reply	send_convert_reply
3490  * receive_unlock_reply		send_unlock_reply
3491  * receive_cancel_reply		send_cancel_reply
3492  * receive_lookup_reply		send_lookup_reply
3493  */
3494 
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3495 static int _create_message(struct dlm_ls *ls, int mb_len,
3496 			   int to_nodeid, int mstype,
3497 			   struct dlm_message **ms_ret,
3498 			   struct dlm_mhandle **mh_ret)
3499 {
3500 	struct dlm_message *ms;
3501 	struct dlm_mhandle *mh;
3502 	char *mb;
3503 
3504 	/* get_buffer gives us a message handle (mh) that we need to
3505 	   pass into midcomms_commit and a message buffer (mb) that we
3506 	   write our data into */
3507 
3508 	mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3509 	if (!mh)
3510 		return -ENOBUFS;
3511 
3512 	ms = (struct dlm_message *) mb;
3513 
3514 	ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3515 	ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3516 	ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3517 	ms->m_header.h_length = cpu_to_le16(mb_len);
3518 	ms->m_header.h_cmd = DLM_MSG;
3519 
3520 	ms->m_type = cpu_to_le32(mstype);
3521 
3522 	*mh_ret = mh;
3523 	*ms_ret = ms;
3524 	return 0;
3525 }
3526 
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3527 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3528 			  int to_nodeid, int mstype,
3529 			  struct dlm_message **ms_ret,
3530 			  struct dlm_mhandle **mh_ret)
3531 {
3532 	int mb_len = sizeof(struct dlm_message);
3533 
3534 	switch (mstype) {
3535 	case DLM_MSG_REQUEST:
3536 	case DLM_MSG_LOOKUP:
3537 	case DLM_MSG_REMOVE:
3538 		mb_len += r->res_length;
3539 		break;
3540 	case DLM_MSG_CONVERT:
3541 	case DLM_MSG_UNLOCK:
3542 	case DLM_MSG_REQUEST_REPLY:
3543 	case DLM_MSG_CONVERT_REPLY:
3544 	case DLM_MSG_GRANT:
3545 		if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3546 			mb_len += r->res_ls->ls_lvblen;
3547 		break;
3548 	}
3549 
3550 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3551 			       ms_ret, mh_ret);
3552 }
3553 
3554 /* further lowcomms enhancements or alternate implementations may make
3555    the return value from this function useful at some point */
3556 
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3557 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3558 			const void *name, int namelen)
3559 {
3560 	dlm_midcomms_commit_mhandle(mh, name, namelen);
3561 	return 0;
3562 }
3563 
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3564 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3565 		      struct dlm_message *ms)
3566 {
3567 	ms->m_nodeid   = cpu_to_le32(lkb->lkb_nodeid);
3568 	ms->m_pid      = cpu_to_le32(lkb->lkb_ownpid);
3569 	ms->m_lkid     = cpu_to_le32(lkb->lkb_id);
3570 	ms->m_remid    = cpu_to_le32(lkb->lkb_remid);
3571 	ms->m_exflags  = cpu_to_le32(lkb->lkb_exflags);
3572 	ms->m_sbflags  = cpu_to_le32(dlm_sbflags_val(lkb));
3573 	ms->m_flags    = cpu_to_le32(dlm_dflags_val(lkb));
3574 	ms->m_lvbseq   = cpu_to_le32(lkb->lkb_lvbseq);
3575 	ms->m_status   = cpu_to_le32(lkb->lkb_status);
3576 	ms->m_grmode   = cpu_to_le32(lkb->lkb_grmode);
3577 	ms->m_rqmode   = cpu_to_le32(lkb->lkb_rqmode);
3578 	ms->m_hash     = cpu_to_le32(r->res_hash);
3579 
3580 	/* m_result and m_bastmode are set from function args,
3581 	   not from lkb fields */
3582 
3583 	if (lkb->lkb_bastfn)
3584 		ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3585 	if (lkb->lkb_astfn)
3586 		ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3587 
3588 	/* compare with switch in create_message; send_remove() doesn't
3589 	   use send_args() */
3590 
3591 	switch (ms->m_type) {
3592 	case cpu_to_le32(DLM_MSG_REQUEST):
3593 	case cpu_to_le32(DLM_MSG_LOOKUP):
3594 		memcpy(ms->m_extra, r->res_name, r->res_length);
3595 		break;
3596 	case cpu_to_le32(DLM_MSG_CONVERT):
3597 	case cpu_to_le32(DLM_MSG_UNLOCK):
3598 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3599 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3600 	case cpu_to_le32(DLM_MSG_GRANT):
3601 		if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3602 			break;
3603 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3604 		break;
3605 	}
3606 }
3607 
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3608 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3609 {
3610 	struct dlm_message *ms;
3611 	struct dlm_mhandle *mh;
3612 	int to_nodeid, error;
3613 
3614 	to_nodeid = r->res_nodeid;
3615 
3616 	add_to_waiters(lkb, mstype, to_nodeid);
3617 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3618 	if (error)
3619 		goto fail;
3620 
3621 	send_args(r, lkb, ms);
3622 
3623 	error = send_message(mh, ms, r->res_name, r->res_length);
3624 	if (error)
3625 		goto fail;
3626 	return 0;
3627 
3628  fail:
3629 	remove_from_waiters(lkb, msg_reply_type(mstype));
3630 	return error;
3631 }
3632 
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3633 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3634 {
3635 	return send_common(r, lkb, DLM_MSG_REQUEST);
3636 }
3637 
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3638 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3639 {
3640 	int error;
3641 
3642 	error = send_common(r, lkb, DLM_MSG_CONVERT);
3643 
3644 	/* down conversions go without a reply from the master */
3645 	if (!error && down_conversion(lkb)) {
3646 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3647 		r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3648 		r->res_ls->ls_local_ms.m_result = 0;
3649 		__receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3650 	}
3651 
3652 	return error;
3653 }
3654 
3655 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3656    MASTER_UNCERTAIN to force the next request on the rsb to confirm
3657    that the master is still correct. */
3658 
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3659 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3660 {
3661 	return send_common(r, lkb, DLM_MSG_UNLOCK);
3662 }
3663 
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3664 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3665 {
3666 	return send_common(r, lkb, DLM_MSG_CANCEL);
3667 }
3668 
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3669 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3670 {
3671 	struct dlm_message *ms;
3672 	struct dlm_mhandle *mh;
3673 	int to_nodeid, error;
3674 
3675 	to_nodeid = lkb->lkb_nodeid;
3676 
3677 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3678 	if (error)
3679 		goto out;
3680 
3681 	send_args(r, lkb, ms);
3682 
3683 	ms->m_result = 0;
3684 
3685 	error = send_message(mh, ms, r->res_name, r->res_length);
3686  out:
3687 	return error;
3688 }
3689 
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3690 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3691 {
3692 	struct dlm_message *ms;
3693 	struct dlm_mhandle *mh;
3694 	int to_nodeid, error;
3695 
3696 	to_nodeid = lkb->lkb_nodeid;
3697 
3698 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3699 	if (error)
3700 		goto out;
3701 
3702 	send_args(r, lkb, ms);
3703 
3704 	ms->m_bastmode = cpu_to_le32(mode);
3705 
3706 	error = send_message(mh, ms, r->res_name, r->res_length);
3707  out:
3708 	return error;
3709 }
3710 
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3711 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3712 {
3713 	struct dlm_message *ms;
3714 	struct dlm_mhandle *mh;
3715 	int to_nodeid, error;
3716 
3717 	to_nodeid = dlm_dir_nodeid(r);
3718 
3719 	add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3720 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3721 	if (error)
3722 		goto fail;
3723 
3724 	send_args(r, lkb, ms);
3725 
3726 	error = send_message(mh, ms, r->res_name, r->res_length);
3727 	if (error)
3728 		goto fail;
3729 	return 0;
3730 
3731  fail:
3732 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3733 	return error;
3734 }
3735 
send_remove(struct dlm_rsb * r)3736 static int send_remove(struct dlm_rsb *r)
3737 {
3738 	struct dlm_message *ms;
3739 	struct dlm_mhandle *mh;
3740 	int to_nodeid, error;
3741 
3742 	to_nodeid = dlm_dir_nodeid(r);
3743 
3744 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3745 	if (error)
3746 		goto out;
3747 
3748 	memcpy(ms->m_extra, r->res_name, r->res_length);
3749 	ms->m_hash = cpu_to_le32(r->res_hash);
3750 
3751 	error = send_message(mh, ms, r->res_name, r->res_length);
3752  out:
3753 	return error;
3754 }
3755 
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3756 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3757 			     int mstype, int rv)
3758 {
3759 	struct dlm_message *ms;
3760 	struct dlm_mhandle *mh;
3761 	int to_nodeid, error;
3762 
3763 	to_nodeid = lkb->lkb_nodeid;
3764 
3765 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3766 	if (error)
3767 		goto out;
3768 
3769 	send_args(r, lkb, ms);
3770 
3771 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3772 
3773 	error = send_message(mh, ms, r->res_name, r->res_length);
3774  out:
3775 	return error;
3776 }
3777 
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3778 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3779 {
3780 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3781 }
3782 
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3783 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3784 {
3785 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3786 }
3787 
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3788 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3789 {
3790 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3791 }
3792 
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3793 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3794 {
3795 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3796 }
3797 
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3798 static int send_lookup_reply(struct dlm_ls *ls,
3799 			     const struct dlm_message *ms_in, int ret_nodeid,
3800 			     int rv)
3801 {
3802 	struct dlm_rsb *r = &ls->ls_local_rsb;
3803 	struct dlm_message *ms;
3804 	struct dlm_mhandle *mh;
3805 	int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3806 
3807 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3808 	if (error)
3809 		goto out;
3810 
3811 	ms->m_lkid = ms_in->m_lkid;
3812 	ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3813 	ms->m_nodeid = cpu_to_le32(ret_nodeid);
3814 
3815 	error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3816  out:
3817 	return error;
3818 }
3819 
3820 /* which args we save from a received message depends heavily on the type
3821    of message, unlike the send side where we can safely send everything about
3822    the lkb for any type of message */
3823 
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3824 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3825 {
3826 	lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3827 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3828 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3829 }
3830 
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3831 static void receive_flags_reply(struct dlm_lkb *lkb,
3832 				const struct dlm_message *ms,
3833 				bool local)
3834 {
3835 	if (local)
3836 		return;
3837 
3838 	dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3839 	dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3840 }
3841 
receive_extralen(const struct dlm_message * ms)3842 static int receive_extralen(const struct dlm_message *ms)
3843 {
3844 	return (le16_to_cpu(ms->m_header.h_length) -
3845 		sizeof(struct dlm_message));
3846 }
3847 
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3848 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3849 		       const struct dlm_message *ms)
3850 {
3851 	int len;
3852 
3853 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3854 		if (!lkb->lkb_lvbptr)
3855 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3856 		if (!lkb->lkb_lvbptr)
3857 			return -ENOMEM;
3858 		len = receive_extralen(ms);
3859 		if (len > ls->ls_lvblen)
3860 			len = ls->ls_lvblen;
3861 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3862 	}
3863 	return 0;
3864 }
3865 
fake_bastfn(void * astparam,int mode)3866 static void fake_bastfn(void *astparam, int mode)
3867 {
3868 	log_print("fake_bastfn should not be called");
3869 }
3870 
fake_astfn(void * astparam)3871 static void fake_astfn(void *astparam)
3872 {
3873 	log_print("fake_astfn should not be called");
3874 }
3875 
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3876 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3877 				const struct dlm_message *ms)
3878 {
3879 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3880 	lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3881 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3882 	lkb->lkb_grmode = DLM_LOCK_IV;
3883 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3884 
3885 	lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3886 	lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3887 
3888 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3889 		/* lkb was just created so there won't be an lvb yet */
3890 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3891 		if (!lkb->lkb_lvbptr)
3892 			return -ENOMEM;
3893 	}
3894 
3895 	return 0;
3896 }
3897 
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3898 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3899 				const struct dlm_message *ms)
3900 {
3901 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3902 		return -EBUSY;
3903 
3904 	if (receive_lvb(ls, lkb, ms))
3905 		return -ENOMEM;
3906 
3907 	lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3908 	lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3909 
3910 	return 0;
3911 }
3912 
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3913 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3914 			       const struct dlm_message *ms)
3915 {
3916 	if (receive_lvb(ls, lkb, ms))
3917 		return -ENOMEM;
3918 	return 0;
3919 }
3920 
3921 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3922    uses to send a reply and that the remote end uses to process the reply. */
3923 
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3924 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3925 {
3926 	struct dlm_lkb *lkb = &ls->ls_local_lkb;
3927 	lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3928 	lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3929 }
3930 
3931 /* This is called after the rsb is locked so that we can safely inspect
3932    fields in the lkb. */
3933 
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3934 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3935 {
3936 	int from = le32_to_cpu(ms->m_header.h_nodeid);
3937 	int error = 0;
3938 
3939 	/* currently mixing of user/kernel locks are not supported */
3940 	if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3941 	    !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3942 		log_error(lkb->lkb_resource->res_ls,
3943 			  "got user dlm message for a kernel lock");
3944 		error = -EINVAL;
3945 		goto out;
3946 	}
3947 
3948 	switch (ms->m_type) {
3949 	case cpu_to_le32(DLM_MSG_CONVERT):
3950 	case cpu_to_le32(DLM_MSG_UNLOCK):
3951 	case cpu_to_le32(DLM_MSG_CANCEL):
3952 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3953 			error = -EINVAL;
3954 		break;
3955 
3956 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3957 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3958 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3959 	case cpu_to_le32(DLM_MSG_GRANT):
3960 	case cpu_to_le32(DLM_MSG_BAST):
3961 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3962 			error = -EINVAL;
3963 		break;
3964 
3965 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3966 		if (!is_process_copy(lkb))
3967 			error = -EINVAL;
3968 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3969 			error = -EINVAL;
3970 		break;
3971 
3972 	default:
3973 		error = -EINVAL;
3974 	}
3975 
3976 out:
3977 	if (error)
3978 		log_error(lkb->lkb_resource->res_ls,
3979 			  "ignore invalid message %d from %d %x %x %x %d",
3980 			  le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3981 			  lkb->lkb_remid, dlm_iflags_val(lkb),
3982 			  lkb->lkb_nodeid);
3983 	return error;
3984 }
3985 
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)3986 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3987 {
3988 	struct dlm_lkb *lkb;
3989 	struct dlm_rsb *r;
3990 	int from_nodeid;
3991 	int error, namelen = 0;
3992 
3993 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3994 
3995 	error = create_lkb(ls, &lkb);
3996 	if (error)
3997 		goto fail;
3998 
3999 	receive_flags(lkb, ms);
4000 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4001 	error = receive_request_args(ls, lkb, ms);
4002 	if (error) {
4003 		__put_lkb(ls, lkb);
4004 		goto fail;
4005 	}
4006 
4007 	/* The dir node is the authority on whether we are the master
4008 	   for this rsb or not, so if the master sends us a request, we should
4009 	   recreate the rsb if we've destroyed it.   This race happens when we
4010 	   send a remove message to the dir node at the same time that the dir
4011 	   node sends us a request for the rsb. */
4012 
4013 	namelen = receive_extralen(ms);
4014 
4015 	error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4016 			 R_RECEIVE_REQUEST, &r);
4017 	if (error) {
4018 		__put_lkb(ls, lkb);
4019 		goto fail;
4020 	}
4021 
4022 	lock_rsb(r);
4023 
4024 	if (r->res_master_nodeid != dlm_our_nodeid()) {
4025 		error = validate_master_nodeid(ls, r, from_nodeid);
4026 		if (error) {
4027 			unlock_rsb(r);
4028 			put_rsb(r);
4029 			__put_lkb(ls, lkb);
4030 			goto fail;
4031 		}
4032 	}
4033 
4034 	attach_lkb(r, lkb);
4035 	error = do_request(r, lkb);
4036 	send_request_reply(r, lkb, error);
4037 	do_request_effects(r, lkb, error);
4038 
4039 	unlock_rsb(r);
4040 	put_rsb(r);
4041 
4042 	if (error == -EINPROGRESS)
4043 		error = 0;
4044 	if (error)
4045 		dlm_put_lkb(lkb);
4046 	return 0;
4047 
4048  fail:
4049 	/* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4050 	   and do this receive_request again from process_lookup_list once
4051 	   we get the lookup reply.  This would avoid a many repeated
4052 	   ENOTBLK request failures when the lookup reply designating us
4053 	   as master is delayed. */
4054 
4055 	if (error != -ENOTBLK) {
4056 		log_limit(ls, "receive_request %x from %d %d",
4057 			  le32_to_cpu(ms->m_lkid), from_nodeid, error);
4058 	}
4059 
4060 	setup_local_lkb(ls, ms);
4061 	send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4062 	return error;
4063 }
4064 
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4065 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4066 {
4067 	struct dlm_lkb *lkb;
4068 	struct dlm_rsb *r;
4069 	int error, reply = 1;
4070 
4071 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4072 	if (error)
4073 		goto fail;
4074 
4075 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4076 		log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4077 			  "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4078 			  (unsigned long long)lkb->lkb_recover_seq,
4079 			  le32_to_cpu(ms->m_header.h_nodeid),
4080 			  le32_to_cpu(ms->m_lkid));
4081 		error = -ENOENT;
4082 		dlm_put_lkb(lkb);
4083 		goto fail;
4084 	}
4085 
4086 	r = lkb->lkb_resource;
4087 
4088 	hold_rsb(r);
4089 	lock_rsb(r);
4090 
4091 	error = validate_message(lkb, ms);
4092 	if (error)
4093 		goto out;
4094 
4095 	receive_flags(lkb, ms);
4096 
4097 	error = receive_convert_args(ls, lkb, ms);
4098 	if (error) {
4099 		send_convert_reply(r, lkb, error);
4100 		goto out;
4101 	}
4102 
4103 	reply = !down_conversion(lkb);
4104 
4105 	error = do_convert(r, lkb);
4106 	if (reply)
4107 		send_convert_reply(r, lkb, error);
4108 	do_convert_effects(r, lkb, error);
4109  out:
4110 	unlock_rsb(r);
4111 	put_rsb(r);
4112 	dlm_put_lkb(lkb);
4113 	return 0;
4114 
4115  fail:
4116 	setup_local_lkb(ls, ms);
4117 	send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4118 	return error;
4119 }
4120 
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4121 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4122 {
4123 	struct dlm_lkb *lkb;
4124 	struct dlm_rsb *r;
4125 	int error;
4126 
4127 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4128 	if (error)
4129 		goto fail;
4130 
4131 	if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4132 		log_error(ls, "receive_unlock %x remid %x remote %d %x",
4133 			  lkb->lkb_id, lkb->lkb_remid,
4134 			  le32_to_cpu(ms->m_header.h_nodeid),
4135 			  le32_to_cpu(ms->m_lkid));
4136 		error = -ENOENT;
4137 		dlm_put_lkb(lkb);
4138 		goto fail;
4139 	}
4140 
4141 	r = lkb->lkb_resource;
4142 
4143 	hold_rsb(r);
4144 	lock_rsb(r);
4145 
4146 	error = validate_message(lkb, ms);
4147 	if (error)
4148 		goto out;
4149 
4150 	receive_flags(lkb, ms);
4151 
4152 	error = receive_unlock_args(ls, lkb, ms);
4153 	if (error) {
4154 		send_unlock_reply(r, lkb, error);
4155 		goto out;
4156 	}
4157 
4158 	error = do_unlock(r, lkb);
4159 	send_unlock_reply(r, lkb, error);
4160 	do_unlock_effects(r, lkb, error);
4161  out:
4162 	unlock_rsb(r);
4163 	put_rsb(r);
4164 	dlm_put_lkb(lkb);
4165 	return 0;
4166 
4167  fail:
4168 	setup_local_lkb(ls, ms);
4169 	send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4170 	return error;
4171 }
4172 
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4173 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4174 {
4175 	struct dlm_lkb *lkb;
4176 	struct dlm_rsb *r;
4177 	int error;
4178 
4179 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4180 	if (error)
4181 		goto fail;
4182 
4183 	receive_flags(lkb, ms);
4184 
4185 	r = lkb->lkb_resource;
4186 
4187 	hold_rsb(r);
4188 	lock_rsb(r);
4189 
4190 	error = validate_message(lkb, ms);
4191 	if (error)
4192 		goto out;
4193 
4194 	error = do_cancel(r, lkb);
4195 	send_cancel_reply(r, lkb, error);
4196 	do_cancel_effects(r, lkb, error);
4197  out:
4198 	unlock_rsb(r);
4199 	put_rsb(r);
4200 	dlm_put_lkb(lkb);
4201 	return 0;
4202 
4203  fail:
4204 	setup_local_lkb(ls, ms);
4205 	send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4206 	return error;
4207 }
4208 
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4209 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4210 {
4211 	struct dlm_lkb *lkb;
4212 	struct dlm_rsb *r;
4213 	int error;
4214 
4215 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4216 	if (error)
4217 		return error;
4218 
4219 	r = lkb->lkb_resource;
4220 
4221 	hold_rsb(r);
4222 	lock_rsb(r);
4223 
4224 	error = validate_message(lkb, ms);
4225 	if (error)
4226 		goto out;
4227 
4228 	receive_flags_reply(lkb, ms, false);
4229 	if (is_altmode(lkb))
4230 		munge_altmode(lkb, ms);
4231 	grant_lock_pc(r, lkb, ms);
4232 	queue_cast(r, lkb, 0);
4233  out:
4234 	unlock_rsb(r);
4235 	put_rsb(r);
4236 	dlm_put_lkb(lkb);
4237 	return 0;
4238 }
4239 
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4240 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4241 {
4242 	struct dlm_lkb *lkb;
4243 	struct dlm_rsb *r;
4244 	int error;
4245 
4246 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4247 	if (error)
4248 		return error;
4249 
4250 	r = lkb->lkb_resource;
4251 
4252 	hold_rsb(r);
4253 	lock_rsb(r);
4254 
4255 	error = validate_message(lkb, ms);
4256 	if (error)
4257 		goto out;
4258 
4259 	queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4260 	lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4261  out:
4262 	unlock_rsb(r);
4263 	put_rsb(r);
4264 	dlm_put_lkb(lkb);
4265 	return 0;
4266 }
4267 
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4268 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4269 {
4270 	int len, error, ret_nodeid, from_nodeid, our_nodeid;
4271 
4272 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4273 	our_nodeid = dlm_our_nodeid();
4274 
4275 	len = receive_extralen(ms);
4276 
4277 	error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4278 				  &ret_nodeid, NULL);
4279 
4280 	/* Optimization: we're master so treat lookup as a request */
4281 	if (!error && ret_nodeid == our_nodeid) {
4282 		receive_request(ls, ms);
4283 		return;
4284 	}
4285 	send_lookup_reply(ls, ms, ret_nodeid, error);
4286 }
4287 
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4288 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4289 {
4290 	char name[DLM_RESNAME_MAXLEN+1];
4291 	struct dlm_rsb *r;
4292 	int rv, len, dir_nodeid, from_nodeid;
4293 
4294 	from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4295 
4296 	len = receive_extralen(ms);
4297 
4298 	if (len > DLM_RESNAME_MAXLEN) {
4299 		log_error(ls, "receive_remove from %d bad len %d",
4300 			  from_nodeid, len);
4301 		return;
4302 	}
4303 
4304 	dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4305 	if (dir_nodeid != dlm_our_nodeid()) {
4306 		log_error(ls, "receive_remove from %d bad nodeid %d",
4307 			  from_nodeid, dir_nodeid);
4308 		return;
4309 	}
4310 
4311 	/*
4312 	 * Look for inactive rsb, if it's there, free it.
4313 	 * If the rsb is active, it's being used, and we should ignore this
4314 	 * message.  This is an expected race between the dir node sending a
4315 	 * request to the master node at the same time as the master node sends
4316 	 * a remove to the dir node.  The resolution to that race is for the
4317 	 * dir node to ignore the remove message, and the master node to
4318 	 * recreate the master rsb when it gets a request from the dir node for
4319 	 * an rsb it doesn't have.
4320 	 */
4321 
4322 	memset(name, 0, sizeof(name));
4323 	memcpy(name, ms->m_extra, len);
4324 
4325 	rcu_read_lock();
4326 	rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4327 	if (rv) {
4328 		rcu_read_unlock();
4329 		/* should not happen */
4330 		log_error(ls, "%s from %d not found %s", __func__,
4331 			  from_nodeid, name);
4332 		return;
4333 	}
4334 
4335 	write_lock_bh(&ls->ls_rsbtbl_lock);
4336 	if (!rsb_flag(r, RSB_HASHED)) {
4337 		rcu_read_unlock();
4338 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4339 		/* should not happen */
4340 		log_error(ls, "%s from %d got removed during removal %s",
4341 			  __func__, from_nodeid, name);
4342 		return;
4343 	}
4344 	/* at this stage the rsb can only being freed here */
4345 	rcu_read_unlock();
4346 
4347 	if (!rsb_flag(r, RSB_INACTIVE)) {
4348 		if (r->res_master_nodeid != from_nodeid) {
4349 			/* should not happen */
4350 			log_error(ls, "receive_remove on active rsb from %d master %d",
4351 				  from_nodeid, r->res_master_nodeid);
4352 			dlm_print_rsb(r);
4353 			write_unlock_bh(&ls->ls_rsbtbl_lock);
4354 			return;
4355 		}
4356 
4357 		/* Ignore the remove message, see race comment above. */
4358 
4359 		log_debug(ls, "receive_remove from %d master %d first %x %s",
4360 			  from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4361 			  name);
4362 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4363 		return;
4364 	}
4365 
4366 	if (r->res_master_nodeid != from_nodeid) {
4367 		log_error(ls, "receive_remove inactive from %d master %d",
4368 			  from_nodeid, r->res_master_nodeid);
4369 		dlm_print_rsb(r);
4370 		write_unlock_bh(&ls->ls_rsbtbl_lock);
4371 		return;
4372 	}
4373 
4374 	list_del(&r->res_slow_list);
4375 	rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4376 			       dlm_rhash_rsb_params);
4377 	rsb_clear_flag(r, RSB_HASHED);
4378 	write_unlock_bh(&ls->ls_rsbtbl_lock);
4379 
4380 	free_inactive_rsb(r);
4381 }
4382 
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4383 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4384 {
4385 	do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4386 }
4387 
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4388 static int receive_request_reply(struct dlm_ls *ls,
4389 				 const struct dlm_message *ms)
4390 {
4391 	struct dlm_lkb *lkb;
4392 	struct dlm_rsb *r;
4393 	int error, mstype, result;
4394 	int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4395 
4396 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4397 	if (error)
4398 		return error;
4399 
4400 	r = lkb->lkb_resource;
4401 	hold_rsb(r);
4402 	lock_rsb(r);
4403 
4404 	error = validate_message(lkb, ms);
4405 	if (error)
4406 		goto out;
4407 
4408 	mstype = lkb->lkb_wait_type;
4409 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4410 	if (error) {
4411 		log_error(ls, "receive_request_reply %x remote %d %x result %d",
4412 			  lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4413 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4414 		dlm_dump_rsb(r);
4415 		goto out;
4416 	}
4417 
4418 	/* Optimization: the dir node was also the master, so it took our
4419 	   lookup as a request and sent request reply instead of lookup reply */
4420 	if (mstype == DLM_MSG_LOOKUP) {
4421 		r->res_master_nodeid = from_nodeid;
4422 		r->res_nodeid = from_nodeid;
4423 		lkb->lkb_nodeid = from_nodeid;
4424 	}
4425 
4426 	/* this is the value returned from do_request() on the master */
4427 	result = from_dlm_errno(le32_to_cpu(ms->m_result));
4428 
4429 	switch (result) {
4430 	case -EAGAIN:
4431 		/* request would block (be queued) on remote master */
4432 		queue_cast(r, lkb, -EAGAIN);
4433 		confirm_master(r, -EAGAIN);
4434 		unhold_lkb(lkb); /* undoes create_lkb() */
4435 		break;
4436 
4437 	case -EINPROGRESS:
4438 	case 0:
4439 		/* request was queued or granted on remote master */
4440 		receive_flags_reply(lkb, ms, false);
4441 		lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4442 		if (is_altmode(lkb))
4443 			munge_altmode(lkb, ms);
4444 		if (result) {
4445 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
4446 		} else {
4447 			grant_lock_pc(r, lkb, ms);
4448 			queue_cast(r, lkb, 0);
4449 		}
4450 		confirm_master(r, result);
4451 		break;
4452 
4453 	case -EBADR:
4454 	case -ENOTBLK:
4455 		/* find_rsb failed to find rsb or rsb wasn't master */
4456 		log_limit(ls, "receive_request_reply %x from %d %d "
4457 			  "master %d dir %d first %x %s", lkb->lkb_id,
4458 			  from_nodeid, result, r->res_master_nodeid,
4459 			  r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4460 
4461 		if (r->res_dir_nodeid != dlm_our_nodeid() &&
4462 		    r->res_master_nodeid != dlm_our_nodeid()) {
4463 			/* cause _request_lock->set_master->send_lookup */
4464 			r->res_master_nodeid = 0;
4465 			r->res_nodeid = -1;
4466 			lkb->lkb_nodeid = -1;
4467 		}
4468 
4469 		if (is_overlap(lkb)) {
4470 			/* we'll ignore error in cancel/unlock reply */
4471 			queue_cast_overlap(r, lkb);
4472 			confirm_master(r, result);
4473 			unhold_lkb(lkb); /* undoes create_lkb() */
4474 		} else {
4475 			_request_lock(r, lkb);
4476 
4477 			if (r->res_master_nodeid == dlm_our_nodeid())
4478 				confirm_master(r, 0);
4479 		}
4480 		break;
4481 
4482 	default:
4483 		log_error(ls, "receive_request_reply %x error %d",
4484 			  lkb->lkb_id, result);
4485 	}
4486 
4487 	if ((result == 0 || result == -EINPROGRESS) &&
4488 	    test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4489 		log_debug(ls, "receive_request_reply %x result %d unlock",
4490 			  lkb->lkb_id, result);
4491 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4492 		send_unlock(r, lkb);
4493 	} else if ((result == -EINPROGRESS) &&
4494 		   test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4495 				      &lkb->lkb_iflags)) {
4496 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4497 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4498 		send_cancel(r, lkb);
4499 	} else {
4500 		clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4501 		clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4502 	}
4503  out:
4504 	unlock_rsb(r);
4505 	put_rsb(r);
4506 	dlm_put_lkb(lkb);
4507 	return 0;
4508 }
4509 
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4510 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4511 				    const struct dlm_message *ms, bool local)
4512 {
4513 	/* this is the value returned from do_convert() on the master */
4514 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4515 	case -EAGAIN:
4516 		/* convert would block (be queued) on remote master */
4517 		queue_cast(r, lkb, -EAGAIN);
4518 		break;
4519 
4520 	case -EDEADLK:
4521 		receive_flags_reply(lkb, ms, local);
4522 		revert_lock_pc(r, lkb);
4523 		queue_cast(r, lkb, -EDEADLK);
4524 		break;
4525 
4526 	case -EINPROGRESS:
4527 		/* convert was queued on remote master */
4528 		receive_flags_reply(lkb, ms, local);
4529 		if (is_demoted(lkb))
4530 			munge_demoted(lkb);
4531 		del_lkb(r, lkb);
4532 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4533 		break;
4534 
4535 	case 0:
4536 		/* convert was granted on remote master */
4537 		receive_flags_reply(lkb, ms, local);
4538 		if (is_demoted(lkb))
4539 			munge_demoted(lkb);
4540 		grant_lock_pc(r, lkb, ms);
4541 		queue_cast(r, lkb, 0);
4542 		break;
4543 
4544 	default:
4545 		log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4546 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4547 			  le32_to_cpu(ms->m_lkid),
4548 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4549 		dlm_print_rsb(r);
4550 		dlm_print_lkb(lkb);
4551 	}
4552 }
4553 
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4554 static void _receive_convert_reply(struct dlm_lkb *lkb,
4555 				   const struct dlm_message *ms, bool local)
4556 {
4557 	struct dlm_rsb *r = lkb->lkb_resource;
4558 	int error;
4559 
4560 	hold_rsb(r);
4561 	lock_rsb(r);
4562 
4563 	error = validate_message(lkb, ms);
4564 	if (error)
4565 		goto out;
4566 
4567 	error = remove_from_waiters_ms(lkb, ms, local);
4568 	if (error)
4569 		goto out;
4570 
4571 	__receive_convert_reply(r, lkb, ms, local);
4572  out:
4573 	unlock_rsb(r);
4574 	put_rsb(r);
4575 }
4576 
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4577 static int receive_convert_reply(struct dlm_ls *ls,
4578 				 const struct dlm_message *ms)
4579 {
4580 	struct dlm_lkb *lkb;
4581 	int error;
4582 
4583 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4584 	if (error)
4585 		return error;
4586 
4587 	_receive_convert_reply(lkb, ms, false);
4588 	dlm_put_lkb(lkb);
4589 	return 0;
4590 }
4591 
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4592 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4593 				  const struct dlm_message *ms, bool local)
4594 {
4595 	struct dlm_rsb *r = lkb->lkb_resource;
4596 	int error;
4597 
4598 	hold_rsb(r);
4599 	lock_rsb(r);
4600 
4601 	error = validate_message(lkb, ms);
4602 	if (error)
4603 		goto out;
4604 
4605 	error = remove_from_waiters_ms(lkb, ms, local);
4606 	if (error)
4607 		goto out;
4608 
4609 	/* this is the value returned from do_unlock() on the master */
4610 
4611 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4612 	case -DLM_EUNLOCK:
4613 		receive_flags_reply(lkb, ms, local);
4614 		remove_lock_pc(r, lkb);
4615 		queue_cast(r, lkb, -DLM_EUNLOCK);
4616 		break;
4617 	case -ENOENT:
4618 		break;
4619 	default:
4620 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
4621 			  lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4622 	}
4623  out:
4624 	unlock_rsb(r);
4625 	put_rsb(r);
4626 }
4627 
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4628 static int receive_unlock_reply(struct dlm_ls *ls,
4629 				const struct dlm_message *ms)
4630 {
4631 	struct dlm_lkb *lkb;
4632 	int error;
4633 
4634 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4635 	if (error)
4636 		return error;
4637 
4638 	_receive_unlock_reply(lkb, ms, false);
4639 	dlm_put_lkb(lkb);
4640 	return 0;
4641 }
4642 
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4643 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4644 				  const struct dlm_message *ms, bool local)
4645 {
4646 	struct dlm_rsb *r = lkb->lkb_resource;
4647 	int error;
4648 
4649 	hold_rsb(r);
4650 	lock_rsb(r);
4651 
4652 	error = validate_message(lkb, ms);
4653 	if (error)
4654 		goto out;
4655 
4656 	error = remove_from_waiters_ms(lkb, ms, local);
4657 	if (error)
4658 		goto out;
4659 
4660 	/* this is the value returned from do_cancel() on the master */
4661 
4662 	switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4663 	case -DLM_ECANCEL:
4664 		receive_flags_reply(lkb, ms, local);
4665 		revert_lock_pc(r, lkb);
4666 		queue_cast(r, lkb, -DLM_ECANCEL);
4667 		break;
4668 	case 0:
4669 		break;
4670 	default:
4671 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
4672 			  lkb->lkb_id,
4673 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4674 	}
4675  out:
4676 	unlock_rsb(r);
4677 	put_rsb(r);
4678 }
4679 
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4680 static int receive_cancel_reply(struct dlm_ls *ls,
4681 				const struct dlm_message *ms)
4682 {
4683 	struct dlm_lkb *lkb;
4684 	int error;
4685 
4686 	error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4687 	if (error)
4688 		return error;
4689 
4690 	_receive_cancel_reply(lkb, ms, false);
4691 	dlm_put_lkb(lkb);
4692 	return 0;
4693 }
4694 
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4695 static void receive_lookup_reply(struct dlm_ls *ls,
4696 				 const struct dlm_message *ms)
4697 {
4698 	struct dlm_lkb *lkb;
4699 	struct dlm_rsb *r;
4700 	int error, ret_nodeid;
4701 	int do_lookup_list = 0;
4702 
4703 	error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4704 	if (error) {
4705 		log_error(ls, "%s no lkid %x", __func__,
4706 			  le32_to_cpu(ms->m_lkid));
4707 		return;
4708 	}
4709 
4710 	/* ms->m_result is the value returned by dlm_master_lookup on dir node
4711 	   FIXME: will a non-zero error ever be returned? */
4712 
4713 	r = lkb->lkb_resource;
4714 	hold_rsb(r);
4715 	lock_rsb(r);
4716 
4717 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4718 	if (error)
4719 		goto out;
4720 
4721 	ret_nodeid = le32_to_cpu(ms->m_nodeid);
4722 
4723 	/* We sometimes receive a request from the dir node for this
4724 	   rsb before we've received the dir node's loookup_reply for it.
4725 	   The request from the dir node implies we're the master, so we set
4726 	   ourself as master in receive_request_reply, and verify here that
4727 	   we are indeed the master. */
4728 
4729 	if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4730 		/* This should never happen */
4731 		log_error(ls, "receive_lookup_reply %x from %d ret %d "
4732 			  "master %d dir %d our %d first %x %s",
4733 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4734 			  ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4735 			  dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4736 	}
4737 
4738 	if (ret_nodeid == dlm_our_nodeid()) {
4739 		r->res_master_nodeid = ret_nodeid;
4740 		r->res_nodeid = 0;
4741 		do_lookup_list = 1;
4742 		r->res_first_lkid = 0;
4743 	} else if (ret_nodeid == -1) {
4744 		/* the remote node doesn't believe it's the dir node */
4745 		log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4746 			  lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4747 		r->res_master_nodeid = 0;
4748 		r->res_nodeid = -1;
4749 		lkb->lkb_nodeid = -1;
4750 	} else {
4751 		/* set_master() will set lkb_nodeid from r */
4752 		r->res_master_nodeid = ret_nodeid;
4753 		r->res_nodeid = ret_nodeid;
4754 	}
4755 
4756 	if (is_overlap(lkb)) {
4757 		log_debug(ls, "receive_lookup_reply %x unlock %x",
4758 			  lkb->lkb_id, dlm_iflags_val(lkb));
4759 		queue_cast_overlap(r, lkb);
4760 		unhold_lkb(lkb); /* undoes create_lkb() */
4761 		goto out_list;
4762 	}
4763 
4764 	_request_lock(r, lkb);
4765 
4766  out_list:
4767 	if (do_lookup_list)
4768 		process_lookup_list(r);
4769  out:
4770 	unlock_rsb(r);
4771 	put_rsb(r);
4772 	dlm_put_lkb(lkb);
4773 }
4774 
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4775 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4776 			     uint32_t saved_seq)
4777 {
4778 	int error = 0, noent = 0;
4779 
4780 	if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4781 		log_limit(ls, "receive %d from non-member %d %x %x %d",
4782 			  le32_to_cpu(ms->m_type),
4783 			  le32_to_cpu(ms->m_header.h_nodeid),
4784 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4785 			  from_dlm_errno(le32_to_cpu(ms->m_result)));
4786 		return;
4787 	}
4788 
4789 	switch (ms->m_type) {
4790 
4791 	/* messages sent to a master node */
4792 
4793 	case cpu_to_le32(DLM_MSG_REQUEST):
4794 		error = receive_request(ls, ms);
4795 		break;
4796 
4797 	case cpu_to_le32(DLM_MSG_CONVERT):
4798 		error = receive_convert(ls, ms);
4799 		break;
4800 
4801 	case cpu_to_le32(DLM_MSG_UNLOCK):
4802 		error = receive_unlock(ls, ms);
4803 		break;
4804 
4805 	case cpu_to_le32(DLM_MSG_CANCEL):
4806 		noent = 1;
4807 		error = receive_cancel(ls, ms);
4808 		break;
4809 
4810 	/* messages sent from a master node (replies to above) */
4811 
4812 	case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4813 		error = receive_request_reply(ls, ms);
4814 		break;
4815 
4816 	case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4817 		error = receive_convert_reply(ls, ms);
4818 		break;
4819 
4820 	case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4821 		error = receive_unlock_reply(ls, ms);
4822 		break;
4823 
4824 	case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4825 		error = receive_cancel_reply(ls, ms);
4826 		break;
4827 
4828 	/* messages sent from a master node (only two types of async msg) */
4829 
4830 	case cpu_to_le32(DLM_MSG_GRANT):
4831 		noent = 1;
4832 		error = receive_grant(ls, ms);
4833 		break;
4834 
4835 	case cpu_to_le32(DLM_MSG_BAST):
4836 		noent = 1;
4837 		error = receive_bast(ls, ms);
4838 		break;
4839 
4840 	/* messages sent to a dir node */
4841 
4842 	case cpu_to_le32(DLM_MSG_LOOKUP):
4843 		receive_lookup(ls, ms);
4844 		break;
4845 
4846 	case cpu_to_le32(DLM_MSG_REMOVE):
4847 		receive_remove(ls, ms);
4848 		break;
4849 
4850 	/* messages sent from a dir node (remove has no reply) */
4851 
4852 	case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4853 		receive_lookup_reply(ls, ms);
4854 		break;
4855 
4856 	/* other messages */
4857 
4858 	case cpu_to_le32(DLM_MSG_PURGE):
4859 		receive_purge(ls, ms);
4860 		break;
4861 
4862 	default:
4863 		log_error(ls, "unknown message type %d",
4864 			  le32_to_cpu(ms->m_type));
4865 	}
4866 
4867 	/*
4868 	 * When checking for ENOENT, we're checking the result of
4869 	 * find_lkb(m_remid):
4870 	 *
4871 	 * The lock id referenced in the message wasn't found.  This may
4872 	 * happen in normal usage for the async messages and cancel, so
4873 	 * only use log_debug for them.
4874 	 *
4875 	 * Some errors are expected and normal.
4876 	 */
4877 
4878 	if (error == -ENOENT && noent) {
4879 		log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4880 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4881 			  le32_to_cpu(ms->m_header.h_nodeid),
4882 			  le32_to_cpu(ms->m_lkid), saved_seq);
4883 	} else if (error == -ENOENT) {
4884 		log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4885 			  le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4886 			  le32_to_cpu(ms->m_header.h_nodeid),
4887 			  le32_to_cpu(ms->m_lkid), saved_seq);
4888 
4889 		if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4890 			dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4891 	}
4892 
4893 	if (error == -EINVAL) {
4894 		log_error(ls, "receive %d inval from %d lkid %x remid %x "
4895 			  "saved_seq %u",
4896 			  le32_to_cpu(ms->m_type),
4897 			  le32_to_cpu(ms->m_header.h_nodeid),
4898 			  le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4899 			  saved_seq);
4900 	}
4901 }
4902 
4903 /* If the lockspace is in recovery mode (locking stopped), then normal
4904    messages are saved on the requestqueue for processing after recovery is
4905    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4906    messages off the requestqueue before we process new ones. This occurs right
4907    after recovery completes when we transition from saving all messages on
4908    requestqueue, to processing all the saved messages, to processing new
4909    messages as they arrive. */
4910 
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4911 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4912 				int nodeid)
4913 {
4914 try_again:
4915 	read_lock_bh(&ls->ls_requestqueue_lock);
4916 	if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917 		/* If we were a member of this lockspace, left, and rejoined,
4918 		   other nodes may still be sending us messages from the
4919 		   lockspace generation before we left. */
4920 		if (WARN_ON_ONCE(!ls->ls_generation)) {
4921 			read_unlock_bh(&ls->ls_requestqueue_lock);
4922 			log_limit(ls, "receive %d from %d ignore old gen",
4923 				  le32_to_cpu(ms->m_type), nodeid);
4924 			return;
4925 		}
4926 
4927 		read_unlock_bh(&ls->ls_requestqueue_lock);
4928 		write_lock_bh(&ls->ls_requestqueue_lock);
4929 		/* recheck because we hold writelock now */
4930 		if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4931 			write_unlock_bh(&ls->ls_requestqueue_lock);
4932 			goto try_again;
4933 		}
4934 
4935 		dlm_add_requestqueue(ls, nodeid, ms);
4936 		write_unlock_bh(&ls->ls_requestqueue_lock);
4937 	} else {
4938 		_receive_message(ls, ms, 0);
4939 		read_unlock_bh(&ls->ls_requestqueue_lock);
4940 	}
4941 }
4942 
4943 /* This is called by dlm_recoverd to process messages that were saved on
4944    the requestqueue. */
4945 
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4946 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4947 			       uint32_t saved_seq)
4948 {
4949 	_receive_message(ls, ms, saved_seq);
4950 }
4951 
4952 /* This is called by the midcomms layer when something is received for
4953    the lockspace.  It could be either a MSG (normal message sent as part of
4954    standard locking activity) or an RCOM (recovery message sent as part of
4955    lockspace recovery). */
4956 
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4957 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4958 {
4959 	const struct dlm_header *hd = &p->header;
4960 	struct dlm_ls *ls;
4961 	int type = 0;
4962 
4963 	switch (hd->h_cmd) {
4964 	case DLM_MSG:
4965 		type = le32_to_cpu(p->message.m_type);
4966 		break;
4967 	case DLM_RCOM:
4968 		type = le32_to_cpu(p->rcom.rc_type);
4969 		break;
4970 	default:
4971 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4972 		return;
4973 	}
4974 
4975 	if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4976 		log_print("invalid h_nodeid %d from %d lockspace %x",
4977 			  le32_to_cpu(hd->h_nodeid), nodeid,
4978 			  le32_to_cpu(hd->u.h_lockspace));
4979 		return;
4980 	}
4981 
4982 	ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4983 	if (!ls) {
4984 		if (dlm_config.ci_log_debug) {
4985 			printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4986 				"%u from %d cmd %d type %d\n",
4987 				le32_to_cpu(hd->u.h_lockspace), nodeid,
4988 				hd->h_cmd, type);
4989 		}
4990 
4991 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4992 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4993 		return;
4994 	}
4995 
4996 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4997 	   be inactive (in this ls) before transitioning to recovery mode */
4998 
4999 	read_lock_bh(&ls->ls_recv_active);
5000 	if (hd->h_cmd == DLM_MSG)
5001 		dlm_receive_message(ls, &p->message, nodeid);
5002 	else if (hd->h_cmd == DLM_RCOM)
5003 		dlm_receive_rcom(ls, &p->rcom, nodeid);
5004 	else
5005 		log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5006 			  hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5007 	read_unlock_bh(&ls->ls_recv_active);
5008 
5009 	dlm_put_lockspace(ls);
5010 }
5011 
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)5012 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5013 				   struct dlm_message *ms_local)
5014 {
5015 	if (middle_conversion(lkb)) {
5016 		log_rinfo(ls, "%s %x middle convert in progress", __func__,
5017 			 lkb->lkb_id);
5018 
5019 		/* We sent this lock to the new master. The new master will
5020 		 * tell us when it's granted.  We no longer need a reply, so
5021 		 * use a fake reply to put the lkb into the right state.
5022 		 */
5023 		hold_lkb(lkb);
5024 		memset(ms_local, 0, sizeof(struct dlm_message));
5025 		ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5026 		ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5027 		ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5028 		_receive_convert_reply(lkb, ms_local, true);
5029 		unhold_lkb(lkb);
5030 
5031 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5032 		set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5033 	}
5034 
5035 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5036 	   conversions are async; there's no reply from the remote master */
5037 }
5038 
5039 /* A waiting lkb needs recovery if the master node has failed, or
5040    the master node is changing (only when no directory is used) */
5041 
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5042 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5043 				 int dir_nodeid)
5044 {
5045 	if (dlm_no_directory(ls))
5046 		return 1;
5047 
5048 	if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5049 		return 1;
5050 
5051 	return 0;
5052 }
5053 
5054 /* Recovery for locks that are waiting for replies from nodes that are now
5055    gone.  We can just complete unlocks and cancels by faking a reply from the
5056    dead node.  Requests and up-conversions we flag to be resent after
5057    recovery.  Down-conversions can just be completed with a fake reply like
5058    unlocks.  Conversions between PR and CW need special attention. */
5059 
dlm_recover_waiters_pre(struct dlm_ls * ls)5060 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5061 {
5062 	struct dlm_lkb *lkb, *safe;
5063 	struct dlm_message *ms_local;
5064 	int wait_type, local_unlock_result, local_cancel_result;
5065 	int dir_nodeid;
5066 
5067 	ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5068 	if (!ms_local)
5069 		return;
5070 
5071 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5072 
5073 		dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5074 
5075 		/* exclude debug messages about unlocks because there can be so
5076 		   many and they aren't very interesting */
5077 
5078 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5079 			log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5080 				  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5081 				  lkb->lkb_id,
5082 				  lkb->lkb_remid,
5083 				  lkb->lkb_wait_type,
5084 				  lkb->lkb_resource->res_nodeid,
5085 				  lkb->lkb_nodeid,
5086 				  lkb->lkb_wait_nodeid,
5087 				  dir_nodeid);
5088 		}
5089 
5090 		/* all outstanding lookups, regardless of destination  will be
5091 		   resent after recovery is done */
5092 
5093 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5094 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5095 			continue;
5096 		}
5097 
5098 		if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5099 			continue;
5100 
5101 		wait_type = lkb->lkb_wait_type;
5102 		local_unlock_result = -DLM_EUNLOCK;
5103 		local_cancel_result = -DLM_ECANCEL;
5104 
5105 		/* Main reply may have been received leaving a zero wait_type,
5106 		   but a reply for the overlapping op may not have been
5107 		   received.  In that case we need to fake the appropriate
5108 		   reply for the overlap op. */
5109 
5110 		if (!wait_type) {
5111 			if (is_overlap_cancel(lkb)) {
5112 				wait_type = DLM_MSG_CANCEL;
5113 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5114 					local_cancel_result = 0;
5115 			}
5116 			if (is_overlap_unlock(lkb)) {
5117 				wait_type = DLM_MSG_UNLOCK;
5118 				if (lkb->lkb_grmode == DLM_LOCK_IV)
5119 					local_unlock_result = -ENOENT;
5120 			}
5121 
5122 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
5123 				  lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5124 				  local_cancel_result, local_unlock_result);
5125 		}
5126 
5127 		switch (wait_type) {
5128 
5129 		case DLM_MSG_REQUEST:
5130 			set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5131 			break;
5132 
5133 		case DLM_MSG_CONVERT:
5134 			recover_convert_waiter(ls, lkb, ms_local);
5135 			break;
5136 
5137 		case DLM_MSG_UNLOCK:
5138 			hold_lkb(lkb);
5139 			memset(ms_local, 0, sizeof(struct dlm_message));
5140 			ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5141 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5142 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5143 			_receive_unlock_reply(lkb, ms_local, true);
5144 			dlm_put_lkb(lkb);
5145 			break;
5146 
5147 		case DLM_MSG_CANCEL:
5148 			hold_lkb(lkb);
5149 			memset(ms_local, 0, sizeof(struct dlm_message));
5150 			ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5151 			ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5152 			ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5153 			_receive_cancel_reply(lkb, ms_local, true);
5154 			dlm_put_lkb(lkb);
5155 			break;
5156 
5157 		default:
5158 			log_error(ls, "invalid lkb wait_type %d %d",
5159 				  lkb->lkb_wait_type, wait_type);
5160 		}
5161 		schedule();
5162 	}
5163 	kfree(ms_local);
5164 }
5165 
find_resend_waiter(struct dlm_ls * ls)5166 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5167 {
5168 	struct dlm_lkb *lkb = NULL, *iter;
5169 
5170 	spin_lock_bh(&ls->ls_waiters_lock);
5171 	list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5172 		if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5173 			hold_lkb(iter);
5174 			lkb = iter;
5175 			break;
5176 		}
5177 	}
5178 	spin_unlock_bh(&ls->ls_waiters_lock);
5179 
5180 	return lkb;
5181 }
5182 
5183 /*
5184  * Forced state reset for locks that were in the middle of remote operations
5185  * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5186  * for a reply from a remote operation.)  The lkbs remaining on the waiters
5187  * list need to be reevaluated; some may need resending to a different node
5188  * than previously, and some may now need local handling rather than remote.
5189  *
5190  * First, the lkb state for the voided remote operation is forcibly reset,
5191  * equivalent to what remove_from_waiters() would normally do:
5192  * . lkb removed from ls_waiters list
5193  * . lkb wait_type cleared
5194  * . lkb waiters_count cleared
5195  * . lkb ref count decremented for each waiters_count (almost always 1,
5196  *   but possibly 2 in case of cancel/unlock overlapping, which means
5197  *   two remote replies were being expected for the lkb.)
5198  *
5199  * Second, the lkb is reprocessed like an original operation would be,
5200  * by passing it to _request_lock or _convert_lock, which will either
5201  * process the lkb operation locally, or send it to a remote node again
5202  * and put the lkb back onto the waiters list.
5203  *
5204  * When reprocessing the lkb, we may find that it's flagged for an overlapping
5205  * force-unlock or cancel, either from before recovery began, or after recovery
5206  * finished.  If this is the case, the unlock/cancel is done directly, and the
5207  * original operation is not initiated again (no _request_lock/_convert_lock.)
5208  */
5209 
dlm_recover_waiters_post(struct dlm_ls * ls)5210 int dlm_recover_waiters_post(struct dlm_ls *ls)
5211 {
5212 	struct dlm_lkb *lkb;
5213 	struct dlm_rsb *r;
5214 	int error = 0, mstype, err, oc, ou;
5215 
5216 	while (1) {
5217 		if (dlm_locking_stopped(ls)) {
5218 			log_debug(ls, "recover_waiters_post aborted");
5219 			error = -EINTR;
5220 			break;
5221 		}
5222 
5223 		/*
5224 		 * Find an lkb from the waiters list that's been affected by
5225 		 * recovery node changes, and needs to be reprocessed.  Does
5226 		 * hold_lkb(), adding a refcount.
5227 		 */
5228 		lkb = find_resend_waiter(ls);
5229 		if (!lkb)
5230 			break;
5231 
5232 		r = lkb->lkb_resource;
5233 		hold_rsb(r);
5234 		lock_rsb(r);
5235 
5236 		/*
5237 		 * If the lkb has been flagged for a force unlock or cancel,
5238 		 * then the reprocessing below will be replaced by just doing
5239 		 * the unlock/cancel directly.
5240 		 */
5241 		mstype = lkb->lkb_wait_type;
5242 		oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5243 					&lkb->lkb_iflags);
5244 		ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5245 					&lkb->lkb_iflags);
5246 		err = 0;
5247 
5248 		log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5249 			  "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5250 			  "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5251 			  r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5252 			  dlm_dir_nodeid(r), oc, ou);
5253 
5254 		/*
5255 		 * No reply to the pre-recovery operation will now be received,
5256 		 * so a forced equivalent of remove_from_waiters() is needed to
5257 		 * reset the waiters state that was in place before recovery.
5258 		 */
5259 
5260 		clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5261 
5262 		/* Forcibly clear wait_type */
5263 		lkb->lkb_wait_type = 0;
5264 
5265 		/*
5266 		 * Forcibly reset wait_count and associated refcount.  The
5267 		 * wait_count will almost always be 1, but in case of an
5268 		 * overlapping unlock/cancel it could be 2: see where
5269 		 * add_to_waiters() finds the lkb is already on the waiters
5270 		 * list and does lkb_wait_count++; hold_lkb().
5271 		 */
5272 		while (lkb->lkb_wait_count) {
5273 			lkb->lkb_wait_count--;
5274 			unhold_lkb(lkb);
5275 		}
5276 
5277 		/* Forcibly remove from waiters list */
5278 		spin_lock_bh(&ls->ls_waiters_lock);
5279 		list_del_init(&lkb->lkb_wait_reply);
5280 		spin_unlock_bh(&ls->ls_waiters_lock);
5281 
5282 		/*
5283 		 * The lkb is now clear of all prior waiters state and can be
5284 		 * processed locally, or sent to remote node again, or directly
5285 		 * cancelled/unlocked.
5286 		 */
5287 
5288 		if (oc || ou) {
5289 			/* do an unlock or cancel instead of resending */
5290 			switch (mstype) {
5291 			case DLM_MSG_LOOKUP:
5292 			case DLM_MSG_REQUEST:
5293 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5294 							-DLM_ECANCEL);
5295 				unhold_lkb(lkb); /* undoes create_lkb() */
5296 				break;
5297 			case DLM_MSG_CONVERT:
5298 				if (oc) {
5299 					queue_cast(r, lkb, -DLM_ECANCEL);
5300 				} else {
5301 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5302 					_unlock_lock(r, lkb);
5303 				}
5304 				break;
5305 			default:
5306 				err = 1;
5307 			}
5308 		} else {
5309 			switch (mstype) {
5310 			case DLM_MSG_LOOKUP:
5311 			case DLM_MSG_REQUEST:
5312 				_request_lock(r, lkb);
5313 				if (r->res_nodeid != -1 && is_master(r))
5314 					confirm_master(r, 0);
5315 				break;
5316 			case DLM_MSG_CONVERT:
5317 				_convert_lock(r, lkb);
5318 				break;
5319 			default:
5320 				err = 1;
5321 			}
5322 		}
5323 
5324 		if (err) {
5325 			log_error(ls, "waiter %x msg %d r_nodeid %d "
5326 				  "dir_nodeid %d overlap %d %d",
5327 				  lkb->lkb_id, mstype, r->res_nodeid,
5328 				  dlm_dir_nodeid(r), oc, ou);
5329 		}
5330 		unlock_rsb(r);
5331 		put_rsb(r);
5332 		dlm_put_lkb(lkb);
5333 	}
5334 
5335 	return error;
5336 }
5337 
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5338 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5339 			      struct list_head *list)
5340 {
5341 	struct dlm_lkb *lkb, *safe;
5342 
5343 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5344 		if (!is_master_copy(lkb))
5345 			continue;
5346 
5347 		/* don't purge lkbs we've added in recover_master_copy for
5348 		   the current recovery seq */
5349 
5350 		if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5351 			continue;
5352 
5353 		del_lkb(r, lkb);
5354 
5355 		/* this put should free the lkb */
5356 		if (!dlm_put_lkb(lkb))
5357 			log_error(ls, "purged mstcpy lkb not released");
5358 	}
5359 }
5360 
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5361 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5362 {
5363 	struct dlm_ls *ls = r->res_ls;
5364 
5365 	purge_mstcpy_list(ls, r, &r->res_grantqueue);
5366 	purge_mstcpy_list(ls, r, &r->res_convertqueue);
5367 	purge_mstcpy_list(ls, r, &r->res_waitqueue);
5368 }
5369 
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5370 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5371 			    struct list_head *list,
5372 			    int nodeid_gone, unsigned int *count)
5373 {
5374 	struct dlm_lkb *lkb, *safe;
5375 
5376 	list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5377 		if (!is_master_copy(lkb))
5378 			continue;
5379 
5380 		if ((lkb->lkb_nodeid == nodeid_gone) ||
5381 		    dlm_is_removed(ls, lkb->lkb_nodeid)) {
5382 
5383 			/* tell recover_lvb to invalidate the lvb
5384 			   because a node holding EX/PW failed */
5385 			if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5386 			    (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5387 				rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5388 			}
5389 
5390 			del_lkb(r, lkb);
5391 
5392 			/* this put should free the lkb */
5393 			if (!dlm_put_lkb(lkb))
5394 				log_error(ls, "purged dead lkb not released");
5395 
5396 			rsb_set_flag(r, RSB_RECOVER_GRANT);
5397 
5398 			(*count)++;
5399 		}
5400 	}
5401 }
5402 
5403 /* Get rid of locks held by nodes that are gone. */
5404 
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5405 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5406 {
5407 	struct dlm_rsb *r;
5408 	struct dlm_member *memb;
5409 	int nodes_count = 0;
5410 	int nodeid_gone = 0;
5411 	unsigned int lkb_count = 0;
5412 
5413 	/* cache one removed nodeid to optimize the common
5414 	   case of a single node removed */
5415 
5416 	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5417 		nodes_count++;
5418 		nodeid_gone = memb->nodeid;
5419 	}
5420 
5421 	if (!nodes_count)
5422 		return;
5423 
5424 	list_for_each_entry(r, root_list, res_root_list) {
5425 		lock_rsb(r);
5426 		if (r->res_nodeid != -1 && is_master(r)) {
5427 			purge_dead_list(ls, r, &r->res_grantqueue,
5428 					nodeid_gone, &lkb_count);
5429 			purge_dead_list(ls, r, &r->res_convertqueue,
5430 					nodeid_gone, &lkb_count);
5431 			purge_dead_list(ls, r, &r->res_waitqueue,
5432 					nodeid_gone, &lkb_count);
5433 		}
5434 		unlock_rsb(r);
5435 
5436 		cond_resched();
5437 	}
5438 
5439 	if (lkb_count)
5440 		log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5441 			  lkb_count, nodes_count);
5442 }
5443 
find_grant_rsb(struct dlm_ls * ls)5444 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5445 {
5446 	struct dlm_rsb *r;
5447 
5448 	read_lock_bh(&ls->ls_rsbtbl_lock);
5449 	list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5450 		if (!rsb_flag(r, RSB_RECOVER_GRANT))
5451 			continue;
5452 		if (!is_master(r)) {
5453 			rsb_clear_flag(r, RSB_RECOVER_GRANT);
5454 			continue;
5455 		}
5456 		hold_rsb(r);
5457 		read_unlock_bh(&ls->ls_rsbtbl_lock);
5458 		return r;
5459 	}
5460 	read_unlock_bh(&ls->ls_rsbtbl_lock);
5461 	return NULL;
5462 }
5463 
5464 /*
5465  * Attempt to grant locks on resources that we are the master of.
5466  * Locks may have become grantable during recovery because locks
5467  * from departed nodes have been purged (or not rebuilt), allowing
5468  * previously blocked locks to now be granted.  The subset of rsb's
5469  * we are interested in are those with lkb's on either the convert or
5470  * waiting queues.
5471  *
5472  * Simplest would be to go through each master rsb and check for non-empty
5473  * convert or waiting queues, and attempt to grant on those rsbs.
5474  * Checking the queues requires lock_rsb, though, for which we'd need
5475  * to release the rsbtbl lock.  This would make iterating through all
5476  * rsb's very inefficient.  So, we rely on earlier recovery routines
5477  * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5478  * locks for.
5479  */
5480 
dlm_recover_grant(struct dlm_ls * ls)5481 void dlm_recover_grant(struct dlm_ls *ls)
5482 {
5483 	struct dlm_rsb *r;
5484 	unsigned int count = 0;
5485 	unsigned int rsb_count = 0;
5486 	unsigned int lkb_count = 0;
5487 
5488 	while (1) {
5489 		r = find_grant_rsb(ls);
5490 		if (!r)
5491 			break;
5492 
5493 		rsb_count++;
5494 		count = 0;
5495 		lock_rsb(r);
5496 		/* the RECOVER_GRANT flag is checked in the grant path */
5497 		grant_pending_locks(r, &count);
5498 		rsb_clear_flag(r, RSB_RECOVER_GRANT);
5499 		lkb_count += count;
5500 		confirm_master(r, 0);
5501 		unlock_rsb(r);
5502 		put_rsb(r);
5503 		cond_resched();
5504 	}
5505 
5506 	if (lkb_count)
5507 		log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5508 			  lkb_count, rsb_count);
5509 }
5510 
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5511 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5512 					 uint32_t remid)
5513 {
5514 	struct dlm_lkb *lkb;
5515 
5516 	list_for_each_entry(lkb, head, lkb_statequeue) {
5517 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5518 			return lkb;
5519 	}
5520 	return NULL;
5521 }
5522 
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5523 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5524 				    uint32_t remid)
5525 {
5526 	struct dlm_lkb *lkb;
5527 
5528 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5529 	if (lkb)
5530 		return lkb;
5531 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5532 	if (lkb)
5533 		return lkb;
5534 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5535 	if (lkb)
5536 		return lkb;
5537 	return NULL;
5538 }
5539 
5540 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5541 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5542 				  struct dlm_rsb *r, const struct dlm_rcom *rc)
5543 {
5544 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5545 
5546 	lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5547 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5548 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5549 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5550 	dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5551 	set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5552 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5553 	lkb->lkb_rqmode = rl->rl_rqmode;
5554 	lkb->lkb_grmode = rl->rl_grmode;
5555 	/* don't set lkb_status because add_lkb wants to itself */
5556 
5557 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5558 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5559 
5560 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5561 		int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5562 			sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5563 		if (lvblen > ls->ls_lvblen)
5564 			return -EINVAL;
5565 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5566 		if (!lkb->lkb_lvbptr)
5567 			return -ENOMEM;
5568 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5569 	}
5570 
5571 	/* Conversions between PR and CW (middle modes) need special handling.
5572 	   The real granted mode of these converting locks cannot be determined
5573 	   until all locks have been rebuilt on the rsb (recover_conversion) */
5574 
5575 	if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5576 		/* We may need to adjust grmode depending on other granted locks. */
5577 		log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5578 			  __func__, lkb->lkb_id, lkb->lkb_grmode,
5579 			  lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5580 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
5581 	}
5582 
5583 	return 0;
5584 }
5585 
5586 /* This lkb may have been recovered in a previous aborted recovery so we need
5587    to check if the rsb already has an lkb with the given remote nodeid/lkid.
5588    If so we just send back a standard reply.  If not, we create a new lkb with
5589    the given values and send back our lkid.  We send back our lkid by sending
5590    back the rcom_lock struct we got but with the remid field filled in. */
5591 
5592 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5593 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5594 			    __le32 *rl_remid, __le32 *rl_result)
5595 {
5596 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5597 	struct dlm_rsb *r;
5598 	struct dlm_lkb *lkb;
5599 	uint32_t remid = 0;
5600 	int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5601 	int error;
5602 
5603 	/* init rl_remid with rcom lock rl_remid */
5604 	*rl_remid = rl->rl_remid;
5605 
5606 	if (rl->rl_parent_lkid) {
5607 		error = -EOPNOTSUPP;
5608 		goto out;
5609 	}
5610 
5611 	remid = le32_to_cpu(rl->rl_lkid);
5612 
5613 	/* In general we expect the rsb returned to be R_MASTER, but we don't
5614 	   have to require it.  Recovery of masters on one node can overlap
5615 	   recovery of locks on another node, so one node can send us MSTCPY
5616 	   locks before we've made ourselves master of this rsb.  We can still
5617 	   add new MSTCPY locks that we receive here without any harm; when
5618 	   we make ourselves master, dlm_recover_masters() won't touch the
5619 	   MSTCPY locks we've received early. */
5620 
5621 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5622 			 from_nodeid, R_RECEIVE_RECOVER, &r);
5623 	if (error)
5624 		goto out;
5625 
5626 	lock_rsb(r);
5627 
5628 	if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5629 		log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5630 			  from_nodeid, remid);
5631 		error = -EBADR;
5632 		goto out_unlock;
5633 	}
5634 
5635 	lkb = search_remid(r, from_nodeid, remid);
5636 	if (lkb) {
5637 		error = -EEXIST;
5638 		goto out_remid;
5639 	}
5640 
5641 	error = create_lkb(ls, &lkb);
5642 	if (error)
5643 		goto out_unlock;
5644 
5645 	error = receive_rcom_lock_args(ls, lkb, r, rc);
5646 	if (error) {
5647 		__put_lkb(ls, lkb);
5648 		goto out_unlock;
5649 	}
5650 
5651 	attach_lkb(r, lkb);
5652 	add_lkb(r, lkb, rl->rl_status);
5653 	ls->ls_recover_locks_in++;
5654 
5655 	if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5656 		rsb_set_flag(r, RSB_RECOVER_GRANT);
5657 
5658  out_remid:
5659 	/* this is the new value returned to the lock holder for
5660 	   saving in its process-copy lkb */
5661 	*rl_remid = cpu_to_le32(lkb->lkb_id);
5662 
5663 	lkb->lkb_recover_seq = ls->ls_recover_seq;
5664 
5665  out_unlock:
5666 	unlock_rsb(r);
5667 	put_rsb(r);
5668  out:
5669 	if (error && error != -EEXIST)
5670 		log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5671 			  from_nodeid, remid, error);
5672 	*rl_result = cpu_to_le32(error);
5673 	return error;
5674 }
5675 
5676 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5677 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5678 			     uint64_t seq)
5679 {
5680 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5681 	struct dlm_rsb *r;
5682 	struct dlm_lkb *lkb;
5683 	uint32_t lkid, remid;
5684 	int error, result;
5685 
5686 	lkid = le32_to_cpu(rl->rl_lkid);
5687 	remid = le32_to_cpu(rl->rl_remid);
5688 	result = le32_to_cpu(rl->rl_result);
5689 
5690 	error = find_lkb(ls, lkid, &lkb);
5691 	if (error) {
5692 		log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5693 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5694 			  result);
5695 		return error;
5696 	}
5697 
5698 	r = lkb->lkb_resource;
5699 	hold_rsb(r);
5700 	lock_rsb(r);
5701 
5702 	if (!is_process_copy(lkb)) {
5703 		log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5704 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5705 			  result);
5706 		dlm_dump_rsb(r);
5707 		unlock_rsb(r);
5708 		put_rsb(r);
5709 		dlm_put_lkb(lkb);
5710 		return -EINVAL;
5711 	}
5712 
5713 	switch (result) {
5714 	case -EBADR:
5715 		/* There's a chance the new master received our lock before
5716 		   dlm_recover_master_reply(), this wouldn't happen if we did
5717 		   a barrier between recover_masters and recover_locks. */
5718 
5719 		log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5720 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5721 			  result);
5722 
5723 		dlm_send_rcom_lock(r, lkb, seq);
5724 		goto out;
5725 	case -EEXIST:
5726 	case 0:
5727 		lkb->lkb_remid = remid;
5728 		break;
5729 	default:
5730 		log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5731 			  lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5732 			  result);
5733 	}
5734 
5735 	/* an ack for dlm_recover_locks() which waits for replies from
5736 	   all the locks it sends to new masters */
5737 	dlm_recovered_lock(r);
5738  out:
5739 	unlock_rsb(r);
5740 	put_rsb(r);
5741 	dlm_put_lkb(lkb);
5742 
5743 	return 0;
5744 }
5745 
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5746 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5747 		     int mode, uint32_t flags, void *name, unsigned int namelen)
5748 {
5749 	struct dlm_lkb *lkb;
5750 	struct dlm_args args;
5751 	bool do_put = true;
5752 	int error;
5753 
5754 	dlm_lock_recovery(ls);
5755 
5756 	error = create_lkb(ls, &lkb);
5757 	if (error) {
5758 		kfree(ua);
5759 		goto out;
5760 	}
5761 
5762 	trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5763 
5764 	if (flags & DLM_LKF_VALBLK) {
5765 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5766 		if (!ua->lksb.sb_lvbptr) {
5767 			kfree(ua);
5768 			error = -ENOMEM;
5769 			goto out_put;
5770 		}
5771 	}
5772 	error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5773 			      fake_bastfn, &args);
5774 	if (error) {
5775 		kfree(ua->lksb.sb_lvbptr);
5776 		ua->lksb.sb_lvbptr = NULL;
5777 		kfree(ua);
5778 		goto out_put;
5779 	}
5780 
5781 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
5782 	   When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5783 	   lock and that lkb_astparam is the dlm_user_args structure. */
5784 	set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5785 	error = request_lock(ls, lkb, name, namelen, &args);
5786 
5787 	switch (error) {
5788 	case 0:
5789 		break;
5790 	case -EINPROGRESS:
5791 		error = 0;
5792 		break;
5793 	case -EAGAIN:
5794 		error = 0;
5795 		fallthrough;
5796 	default:
5797 		goto out_put;
5798 	}
5799 
5800 	/* add this new lkb to the per-process list of locks */
5801 	spin_lock_bh(&ua->proc->locks_spin);
5802 	hold_lkb(lkb);
5803 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5804 	spin_unlock_bh(&ua->proc->locks_spin);
5805 	do_put = false;
5806  out_put:
5807 	trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5808 	if (do_put)
5809 		__put_lkb(ls, lkb);
5810  out:
5811 	dlm_unlock_recovery(ls);
5812 	return error;
5813 }
5814 
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5815 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5816 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5817 {
5818 	struct dlm_lkb *lkb;
5819 	struct dlm_args args;
5820 	struct dlm_user_args *ua;
5821 	int error;
5822 
5823 	dlm_lock_recovery(ls);
5824 
5825 	error = find_lkb(ls, lkid, &lkb);
5826 	if (error)
5827 		goto out;
5828 
5829 	trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5830 
5831 	/* user can change the params on its lock when it converts it, or
5832 	   add an lvb that didn't exist before */
5833 
5834 	ua = lkb->lkb_ua;
5835 
5836 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5837 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5838 		if (!ua->lksb.sb_lvbptr) {
5839 			error = -ENOMEM;
5840 			goto out_put;
5841 		}
5842 	}
5843 	if (lvb_in && ua->lksb.sb_lvbptr)
5844 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5845 
5846 	ua->xid = ua_tmp->xid;
5847 	ua->castparam = ua_tmp->castparam;
5848 	ua->castaddr = ua_tmp->castaddr;
5849 	ua->bastparam = ua_tmp->bastparam;
5850 	ua->bastaddr = ua_tmp->bastaddr;
5851 	ua->user_lksb = ua_tmp->user_lksb;
5852 
5853 	error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5854 			      fake_bastfn, &args);
5855 	if (error)
5856 		goto out_put;
5857 
5858 	error = convert_lock(ls, lkb, &args);
5859 
5860 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5861 		error = 0;
5862  out_put:
5863 	trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5864 	dlm_put_lkb(lkb);
5865  out:
5866 	dlm_unlock_recovery(ls);
5867 	kfree(ua_tmp);
5868 	return error;
5869 }
5870 
5871 /*
5872  * The caller asks for an orphan lock on a given resource with a given mode.
5873  * If a matching lock exists, it's moved to the owner's list of locks and
5874  * the lkid is returned.
5875  */
5876 
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5877 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5878 		     int mode, uint32_t flags, void *name, unsigned int namelen,
5879 		     uint32_t *lkid)
5880 {
5881 	struct dlm_lkb *lkb = NULL, *iter;
5882 	struct dlm_user_args *ua;
5883 	int found_other_mode = 0;
5884 	int rv = 0;
5885 
5886 	spin_lock_bh(&ls->ls_orphans_lock);
5887 	list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5888 		if (iter->lkb_resource->res_length != namelen)
5889 			continue;
5890 		if (memcmp(iter->lkb_resource->res_name, name, namelen))
5891 			continue;
5892 		if (iter->lkb_grmode != mode) {
5893 			found_other_mode = 1;
5894 			continue;
5895 		}
5896 
5897 		lkb = iter;
5898 		list_del_init(&iter->lkb_ownqueue);
5899 		clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5900 		*lkid = iter->lkb_id;
5901 		break;
5902 	}
5903 	spin_unlock_bh(&ls->ls_orphans_lock);
5904 
5905 	if (!lkb && found_other_mode) {
5906 		rv = -EAGAIN;
5907 		goto out;
5908 	}
5909 
5910 	if (!lkb) {
5911 		rv = -ENOENT;
5912 		goto out;
5913 	}
5914 
5915 	lkb->lkb_exflags = flags;
5916 	lkb->lkb_ownpid = (int) current->pid;
5917 
5918 	ua = lkb->lkb_ua;
5919 
5920 	ua->proc = ua_tmp->proc;
5921 	ua->xid = ua_tmp->xid;
5922 	ua->castparam = ua_tmp->castparam;
5923 	ua->castaddr = ua_tmp->castaddr;
5924 	ua->bastparam = ua_tmp->bastparam;
5925 	ua->bastaddr = ua_tmp->bastaddr;
5926 	ua->user_lksb = ua_tmp->user_lksb;
5927 
5928 	/*
5929 	 * The lkb reference from the ls_orphans list was not
5930 	 * removed above, and is now considered the reference
5931 	 * for the proc locks list.
5932 	 */
5933 
5934 	spin_lock_bh(&ua->proc->locks_spin);
5935 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5936 	spin_unlock_bh(&ua->proc->locks_spin);
5937  out:
5938 	kfree(ua_tmp);
5939 	return rv;
5940 }
5941 
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5942 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5943 		    uint32_t flags, uint32_t lkid, char *lvb_in)
5944 {
5945 	struct dlm_lkb *lkb;
5946 	struct dlm_args args;
5947 	struct dlm_user_args *ua;
5948 	int error;
5949 
5950 	dlm_lock_recovery(ls);
5951 
5952 	error = find_lkb(ls, lkid, &lkb);
5953 	if (error)
5954 		goto out;
5955 
5956 	trace_dlm_unlock_start(ls, lkb, flags);
5957 
5958 	ua = lkb->lkb_ua;
5959 
5960 	if (lvb_in && ua->lksb.sb_lvbptr)
5961 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5962 	if (ua_tmp->castparam)
5963 		ua->castparam = ua_tmp->castparam;
5964 	ua->user_lksb = ua_tmp->user_lksb;
5965 
5966 	error = set_unlock_args(flags, ua, &args);
5967 	if (error)
5968 		goto out_put;
5969 
5970 	error = unlock_lock(ls, lkb, &args);
5971 
5972 	if (error == -DLM_EUNLOCK)
5973 		error = 0;
5974 	/* from validate_unlock_args() */
5975 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5976 		error = 0;
5977 	if (error)
5978 		goto out_put;
5979 
5980 	spin_lock_bh(&ua->proc->locks_spin);
5981 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
5982 	if (!list_empty(&lkb->lkb_ownqueue))
5983 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5984 	spin_unlock_bh(&ua->proc->locks_spin);
5985  out_put:
5986 	trace_dlm_unlock_end(ls, lkb, flags, error);
5987 	dlm_put_lkb(lkb);
5988  out:
5989 	dlm_unlock_recovery(ls);
5990 	kfree(ua_tmp);
5991 	return error;
5992 }
5993 
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)5994 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5995 		    uint32_t flags, uint32_t lkid)
5996 {
5997 	struct dlm_lkb *lkb;
5998 	struct dlm_args args;
5999 	struct dlm_user_args *ua;
6000 	int error;
6001 
6002 	dlm_lock_recovery(ls);
6003 
6004 	error = find_lkb(ls, lkid, &lkb);
6005 	if (error)
6006 		goto out;
6007 
6008 	trace_dlm_unlock_start(ls, lkb, flags);
6009 
6010 	ua = lkb->lkb_ua;
6011 	if (ua_tmp->castparam)
6012 		ua->castparam = ua_tmp->castparam;
6013 	ua->user_lksb = ua_tmp->user_lksb;
6014 
6015 	error = set_unlock_args(flags, ua, &args);
6016 	if (error)
6017 		goto out_put;
6018 
6019 	error = cancel_lock(ls, lkb, &args);
6020 
6021 	if (error == -DLM_ECANCEL)
6022 		error = 0;
6023 	/* from validate_unlock_args() */
6024 	if (error == -EBUSY)
6025 		error = 0;
6026  out_put:
6027 	trace_dlm_unlock_end(ls, lkb, flags, error);
6028 	dlm_put_lkb(lkb);
6029  out:
6030 	dlm_unlock_recovery(ls);
6031 	kfree(ua_tmp);
6032 	return error;
6033 }
6034 
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6035 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6036 {
6037 	struct dlm_lkb *lkb;
6038 	struct dlm_args args;
6039 	struct dlm_user_args *ua;
6040 	struct dlm_rsb *r;
6041 	int error;
6042 
6043 	dlm_lock_recovery(ls);
6044 
6045 	error = find_lkb(ls, lkid, &lkb);
6046 	if (error)
6047 		goto out;
6048 
6049 	trace_dlm_unlock_start(ls, lkb, flags);
6050 
6051 	ua = lkb->lkb_ua;
6052 
6053 	error = set_unlock_args(flags, ua, &args);
6054 	if (error)
6055 		goto out_put;
6056 
6057 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6058 
6059 	r = lkb->lkb_resource;
6060 	hold_rsb(r);
6061 	lock_rsb(r);
6062 
6063 	error = validate_unlock_args(lkb, &args);
6064 	if (error)
6065 		goto out_r;
6066 	set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6067 
6068 	error = _cancel_lock(r, lkb);
6069  out_r:
6070 	unlock_rsb(r);
6071 	put_rsb(r);
6072 
6073 	if (error == -DLM_ECANCEL)
6074 		error = 0;
6075 	/* from validate_unlock_args() */
6076 	if (error == -EBUSY)
6077 		error = 0;
6078  out_put:
6079 	trace_dlm_unlock_end(ls, lkb, flags, error);
6080 	dlm_put_lkb(lkb);
6081  out:
6082 	dlm_unlock_recovery(ls);
6083 	return error;
6084 }
6085 
6086 /* lkb's that are removed from the waiters list by revert are just left on the
6087    orphans list with the granted orphan locks, to be freed by purge */
6088 
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6089 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6090 {
6091 	struct dlm_args args;
6092 	int error;
6093 
6094 	hold_lkb(lkb); /* reference for the ls_orphans list */
6095 	spin_lock_bh(&ls->ls_orphans_lock);
6096 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6097 	spin_unlock_bh(&ls->ls_orphans_lock);
6098 
6099 	set_unlock_args(0, lkb->lkb_ua, &args);
6100 
6101 	error = cancel_lock(ls, lkb, &args);
6102 	if (error == -DLM_ECANCEL)
6103 		error = 0;
6104 	return error;
6105 }
6106 
6107 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6108    granted.  Regardless of what rsb queue the lock is on, it's removed and
6109    freed.  The IVVALBLK flag causes the lvb on the resource to be invalidated
6110    if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6111 
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6112 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6113 {
6114 	struct dlm_args args;
6115 	int error;
6116 
6117 	set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6118 			lkb->lkb_ua, &args);
6119 
6120 	error = unlock_lock(ls, lkb, &args);
6121 	if (error == -DLM_EUNLOCK)
6122 		error = 0;
6123 	return error;
6124 }
6125 
6126 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6127    (which does lock_rsb) due to deadlock with receiving a message that does
6128    lock_rsb followed by dlm_user_add_cb() */
6129 
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6130 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6131 				     struct dlm_user_proc *proc)
6132 {
6133 	struct dlm_lkb *lkb = NULL;
6134 
6135 	spin_lock_bh(&ls->ls_clear_proc_locks);
6136 	if (list_empty(&proc->locks))
6137 		goto out;
6138 
6139 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6140 	list_del_init(&lkb->lkb_ownqueue);
6141 
6142 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6143 		set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6144 	else
6145 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6146  out:
6147 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6148 	return lkb;
6149 }
6150 
6151 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6152    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6153    which we clear here. */
6154 
6155 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6156    list, and no more device_writes should add lkb's to proc->locks list; so we
6157    shouldn't need to take asts_spin or locks_spin here.  this assumes that
6158    device reads/writes/closes are serialized -- FIXME: we may need to serialize
6159    them ourself. */
6160 
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6161 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6162 {
6163 	struct dlm_callback *cb, *cb_safe;
6164 	struct dlm_lkb *lkb, *safe;
6165 
6166 	dlm_lock_recovery(ls);
6167 
6168 	while (1) {
6169 		lkb = del_proc_lock(ls, proc);
6170 		if (!lkb)
6171 			break;
6172 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6173 			orphan_proc_lock(ls, lkb);
6174 		else
6175 			unlock_proc_lock(ls, lkb);
6176 
6177 		/* this removes the reference for the proc->locks list
6178 		   added by dlm_user_request, it may result in the lkb
6179 		   being freed */
6180 
6181 		dlm_put_lkb(lkb);
6182 	}
6183 
6184 	spin_lock_bh(&ls->ls_clear_proc_locks);
6185 
6186 	/* in-progress unlocks */
6187 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6188 		list_del_init(&lkb->lkb_ownqueue);
6189 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6190 		dlm_put_lkb(lkb);
6191 	}
6192 
6193 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6194 		list_del(&cb->list);
6195 		dlm_free_cb(cb);
6196 	}
6197 
6198 	spin_unlock_bh(&ls->ls_clear_proc_locks);
6199 	dlm_unlock_recovery(ls);
6200 }
6201 
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6202 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6203 {
6204 	struct dlm_callback *cb, *cb_safe;
6205 	struct dlm_lkb *lkb, *safe;
6206 
6207 	while (1) {
6208 		lkb = NULL;
6209 		spin_lock_bh(&proc->locks_spin);
6210 		if (!list_empty(&proc->locks)) {
6211 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
6212 					 lkb_ownqueue);
6213 			list_del_init(&lkb->lkb_ownqueue);
6214 		}
6215 		spin_unlock_bh(&proc->locks_spin);
6216 
6217 		if (!lkb)
6218 			break;
6219 
6220 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6221 		unlock_proc_lock(ls, lkb);
6222 		dlm_put_lkb(lkb); /* ref from proc->locks list */
6223 	}
6224 
6225 	spin_lock_bh(&proc->locks_spin);
6226 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6227 		list_del_init(&lkb->lkb_ownqueue);
6228 		set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6229 		dlm_put_lkb(lkb);
6230 	}
6231 	spin_unlock_bh(&proc->locks_spin);
6232 
6233 	spin_lock_bh(&proc->asts_spin);
6234 	list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6235 		list_del(&cb->list);
6236 		dlm_free_cb(cb);
6237 	}
6238 	spin_unlock_bh(&proc->asts_spin);
6239 }
6240 
6241 /* pid of 0 means purge all orphans */
6242 
do_purge(struct dlm_ls * ls,int nodeid,int pid)6243 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6244 {
6245 	struct dlm_lkb *lkb, *safe;
6246 
6247 	spin_lock_bh(&ls->ls_orphans_lock);
6248 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6249 		if (pid && lkb->lkb_ownpid != pid)
6250 			continue;
6251 		unlock_proc_lock(ls, lkb);
6252 		list_del_init(&lkb->lkb_ownqueue);
6253 		dlm_put_lkb(lkb);
6254 	}
6255 	spin_unlock_bh(&ls->ls_orphans_lock);
6256 }
6257 
send_purge(struct dlm_ls * ls,int nodeid,int pid)6258 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6259 {
6260 	struct dlm_message *ms;
6261 	struct dlm_mhandle *mh;
6262 	int error;
6263 
6264 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6265 				DLM_MSG_PURGE, &ms, &mh);
6266 	if (error)
6267 		return error;
6268 	ms->m_nodeid = cpu_to_le32(nodeid);
6269 	ms->m_pid = cpu_to_le32(pid);
6270 
6271 	return send_message(mh, ms, NULL, 0);
6272 }
6273 
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6274 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6275 		   int nodeid, int pid)
6276 {
6277 	int error = 0;
6278 
6279 	if (nodeid && (nodeid != dlm_our_nodeid())) {
6280 		error = send_purge(ls, nodeid, pid);
6281 	} else {
6282 		dlm_lock_recovery(ls);
6283 		if (pid == current->pid)
6284 			purge_proc_locks(ls, proc);
6285 		else
6286 			do_purge(ls, nodeid, pid);
6287 		dlm_unlock_recovery(ls);
6288 	}
6289 	return error;
6290 }
6291 
6292 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6293 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6294 		      int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6295 {
6296 	struct dlm_lksb *lksb;
6297 	struct dlm_lkb *lkb;
6298 	struct dlm_rsb *r;
6299 	int error;
6300 
6301 	/* we currently can't set a valid user lock */
6302 	if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6303 		return -EOPNOTSUPP;
6304 
6305 	lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6306 	if (!lksb)
6307 		return -ENOMEM;
6308 
6309 	error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6310 	if (error) {
6311 		kfree(lksb);
6312 		return error;
6313 	}
6314 
6315 	dlm_set_dflags_val(lkb, lkb_dflags);
6316 	lkb->lkb_nodeid = lkb_nodeid;
6317 	lkb->lkb_lksb = lksb;
6318 	/* user specific pointer, just don't have it NULL for kernel locks */
6319 	if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6320 		lkb->lkb_astparam = (void *)0xDEADBEEF;
6321 
6322 	error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6323 	if (error) {
6324 		kfree(lksb);
6325 		__put_lkb(ls, lkb);
6326 		return error;
6327 	}
6328 
6329 	lock_rsb(r);
6330 	attach_lkb(r, lkb);
6331 	add_lkb(r, lkb, lkb_status);
6332 	unlock_rsb(r);
6333 	put_rsb(r);
6334 
6335 	return 0;
6336 }
6337 
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6338 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6339 				 int mstype, int to_nodeid)
6340 {
6341 	struct dlm_lkb *lkb;
6342 	int error;
6343 
6344 	error = find_lkb(ls, lkb_id, &lkb);
6345 	if (error)
6346 		return error;
6347 
6348 	add_to_waiters(lkb, mstype, to_nodeid);
6349 	dlm_put_lkb(lkb);
6350 	return 0;
6351 }
6352 
6353