1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13 dlm_lock()
14 dlm_unlock()
15
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
20
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
25
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
30
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
33
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
38
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
41
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
47
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
51
52 L: send_xxxx() -> R: receive_xxxx()
53 R: do_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93
94 /*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 */
101
102 static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112 };
113
114 /*
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
121 */
122
123 const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
dlm_modes_compat(int mode1,int mode2)138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
147 */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159 };
160
dlm_print_lkb(struct dlm_lkb * lkb)161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
169 }
170
dlm_print_rsb(struct dlm_rsb * r)171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 "rlc %d name %s\n",
175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 r->res_name);
178 }
179
dlm_dump_rsb(struct dlm_rsb * r)180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 struct dlm_lkb *lkb;
183
184 dlm_print_rsb(r);
185
186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 printk(KERN_ERR "rsb lookup list\n");
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 dlm_print_lkb(lkb);
191 printk(KERN_ERR "rsb grant queue:\n");
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
194 printk(KERN_ERR "rsb convert queue:\n");
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
197 printk(KERN_ERR "rsb wait queue:\n");
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
dlm_lock_recovery(struct dlm_ls * ls)204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 down_read(&ls->ls_in_recovery);
207 }
208
dlm_unlock_recovery(struct dlm_ls * ls)209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 up_read(&ls->ls_in_recovery);
212 }
213
dlm_lock_recovery_try(struct dlm_ls * ls)214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 return down_read_trylock(&ls->ls_in_recovery);
217 }
218
can_be_queued(struct dlm_lkb * lkb)219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
force_blocking_asts(struct dlm_lkb * lkb)224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
is_demoted(struct dlm_lkb * lkb)229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
is_altmode(struct dlm_lkb * lkb)234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
is_granted(struct dlm_lkb * lkb)239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
is_remote(struct dlm_rsb * r)244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 return !!r->res_nodeid;
248 }
249
is_process_copy(struct dlm_lkb * lkb)250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 return 1;
266 return 0;
267 }
268
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 if (is_master_copy(lkb))
293 return;
294
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297 if (rv == -DLM_ECANCEL &&
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 rv = -EDEADLK;
300
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 queue_cast(r, lkb,
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
314 } else {
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 }
317 }
318
319 /*
320 * Basic operations on rsb's and lkb's
321 */
322
rsb_toss_jiffies(void)323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327
328 /* This is only called to add a reference when the code already holds
329 a valid reference to the rsb, so there's no need for locking. */
330
hold_rsb(struct dlm_rsb * r)331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 /* inactive rsbs are not ref counted */
334 WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 kref_get(&r->res_ref);
336 }
337
dlm_hold_rsb(struct dlm_rsb * r)338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 hold_rsb(r);
341 }
342
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 if (refcount_dec_not_one(r))
349 return false;
350
351 write_lock_bh(lock);
352 if (!refcount_dec_and_test(r)) {
353 write_unlock_bh(lock);
354 return false;
355 }
356
357 return true;
358 }
359
360 /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 void (*release)(struct kref *kref),
363 rwlock_t *lock)
364 {
365 if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 release(kref);
367 return 1;
368 }
369
370 return 0;
371 }
372
put_rsb(struct dlm_rsb * r)373 static void put_rsb(struct dlm_rsb *r)
374 {
375 struct dlm_ls *ls = r->res_ls;
376 int rv;
377
378 rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 &ls->ls_rsbtbl_lock);
380 if (rv)
381 write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383
dlm_put_rsb(struct dlm_rsb * r)384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 put_rsb(r);
387 }
388
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390 * new timers when recovery is triggered and don't run them
391 * again until a resume_scan_timer() tries it again.
392 */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 if (!dlm_locking_stopped(ls))
396 mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398
399 /* This function tries to resume the timer callback if a rsb
400 * is on the scan list and no timer is pending. It might that
401 * the first entry is on currently executed as timer callback
402 * but we don't care if a timer queued up again and does
403 * nothing. Should be a rare case.
404 */
resume_scan_timer(struct dlm_ls * ls)405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 struct dlm_rsb *r;
408
409 spin_lock_bh(&ls->ls_scan_lock);
410 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 res_scan_list);
412 if (r && !timer_pending(&ls->ls_scan_timer))
413 enable_scan_timer(ls, r->res_toss_time);
414 spin_unlock_bh(&ls->ls_scan_lock);
415 }
416
417 /* ls_rsbtbl_lock must be held */
418
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 struct dlm_rsb *first;
422
423 /* active rsbs should never be on the scan list */
424 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425
426 spin_lock_bh(&ls->ls_scan_lock);
427 r->res_toss_time = 0;
428
429 /* if the rsb is not queued do nothing */
430 if (list_empty(&r->res_scan_list))
431 goto out;
432
433 /* get the first element before delete */
434 first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 res_scan_list);
436 list_del_init(&r->res_scan_list);
437 /* check if the first element was the rsb we deleted */
438 if (first == r) {
439 /* try to get the new first element, if the list
440 * is empty now try to delete the timer, if we are
441 * too late we don't care.
442 *
443 * if the list isn't empty and a new first element got
444 * in place, set the new timer expire time.
445 */
446 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 res_scan_list);
448 if (!first)
449 timer_delete(&ls->ls_scan_timer);
450 else
451 enable_scan_timer(ls, first->res_toss_time);
452 }
453
454 out:
455 spin_unlock_bh(&ls->ls_scan_lock);
456 }
457
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 int our_nodeid = dlm_our_nodeid();
461 struct dlm_rsb *first;
462
463 /* A dir record for a remote master rsb should never be on the scan list. */
464 WARN_ON(!dlm_no_directory(ls) &&
465 (r->res_master_nodeid != our_nodeid) &&
466 (dlm_dir_nodeid(r) == our_nodeid));
467
468 /* An active rsb should never be on the scan list. */
469 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470
471 /* An rsb should not already be on the scan list. */
472 WARN_ON(!list_empty(&r->res_scan_list));
473
474 spin_lock_bh(&ls->ls_scan_lock);
475 /* set the new rsb absolute expire time in the rsb */
476 r->res_toss_time = rsb_toss_jiffies();
477 if (list_empty(&ls->ls_scan_list)) {
478 /* if the queue is empty add the element and it's
479 * our new expire time
480 */
481 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 enable_scan_timer(ls, r->res_toss_time);
483 } else {
484 /* try to get the maybe new first element and then add
485 * to this rsb with the oldest expire time to the end
486 * of the queue. If the list was empty before this
487 * rsb expire time is our next expiration if it wasn't
488 * the now new first elemet is our new expiration time
489 */
490 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 res_scan_list);
492 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 if (!first)
494 enable_scan_timer(ls, r->res_toss_time);
495 else
496 enable_scan_timer(ls, first->res_toss_time);
497 }
498 spin_unlock_bh(&ls->ls_scan_lock);
499 }
500
501 /* if we hit contention we do in 250 ms a retry to trylock.
502 * if there is any other mod_timer in between we don't care
503 * about that it expires earlier again this is only for the
504 * unlikely case nothing happened in this time.
505 */
506 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
507
508 /* Called by lockspace scan_timer to free unused rsb's. */
509
dlm_rsb_scan(struct timer_list * timer)510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 int our_nodeid = dlm_our_nodeid();
514 struct dlm_rsb *r;
515 int rv;
516
517 while (1) {
518 /* interrupting point to leave iteration when
519 * recovery waits for timer_delete_sync(), recovery
520 * will take care to delete everything in scan list.
521 */
522 if (dlm_locking_stopped(ls))
523 break;
524
525 rv = spin_trylock(&ls->ls_scan_lock);
526 if (!rv) {
527 /* rearm again try timer */
528 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 break;
530 }
531
532 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 res_scan_list);
534 if (!r) {
535 /* the next add_scan will enable the timer again */
536 spin_unlock(&ls->ls_scan_lock);
537 break;
538 }
539
540 /*
541 * If the first rsb is not yet expired, then stop because the
542 * list is sorted with nearest expiration first.
543 */
544 if (time_before(jiffies, r->res_toss_time)) {
545 /* rearm with the next rsb to expire in the future */
546 enable_scan_timer(ls, r->res_toss_time);
547 spin_unlock(&ls->ls_scan_lock);
548 break;
549 }
550
551 /* in find_rsb_dir/nodir there is a reverse order of this
552 * lock, however this is only a trylock if we hit some
553 * possible contention we try it again.
554 */
555 rv = write_trylock(&ls->ls_rsbtbl_lock);
556 if (!rv) {
557 spin_unlock(&ls->ls_scan_lock);
558 /* rearm again try timer */
559 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 break;
561 }
562
563 list_del(&r->res_slow_list);
564 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 dlm_rhash_rsb_params);
566 rsb_clear_flag(r, RSB_HASHED);
567
568 /* ls_rsbtbl_lock is not needed when calling send_remove() */
569 write_unlock(&ls->ls_rsbtbl_lock);
570
571 list_del_init(&r->res_scan_list);
572 spin_unlock(&ls->ls_scan_lock);
573
574 /* An rsb that is a dir record for a remote master rsb
575 * cannot be removed, and should not have a timer enabled.
576 */
577 WARN_ON(!dlm_no_directory(ls) &&
578 (r->res_master_nodeid != our_nodeid) &&
579 (dlm_dir_nodeid(r) == our_nodeid));
580
581 /* We're the master of this rsb but we're not
582 * the directory record, so we need to tell the
583 * dir node to remove the dir record
584 */
585 if (!dlm_no_directory(ls) &&
586 (r->res_master_nodeid == our_nodeid) &&
587 (dlm_dir_nodeid(r) != our_nodeid))
588 send_remove(r);
589
590 free_inactive_rsb(r);
591 }
592 }
593
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595 unlock any spinlocks, go back and call pre_rsb_struct again.
596 Otherwise, take an rsb off the list and return it. */
597
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 struct dlm_rsb **r_ret)
600 {
601 struct dlm_rsb *r;
602
603 r = dlm_allocate_rsb();
604 if (!r)
605 return -ENOMEM;
606
607 r->res_ls = ls;
608 r->res_length = len;
609 memcpy(r->res_name, name, len);
610 spin_lock_init(&r->res_lock);
611
612 INIT_LIST_HEAD(&r->res_lookup);
613 INIT_LIST_HEAD(&r->res_grantqueue);
614 INIT_LIST_HEAD(&r->res_convertqueue);
615 INIT_LIST_HEAD(&r->res_waitqueue);
616 INIT_LIST_HEAD(&r->res_root_list);
617 INIT_LIST_HEAD(&r->res_scan_list);
618 INIT_LIST_HEAD(&r->res_recover_list);
619 INIT_LIST_HEAD(&r->res_masters_list);
620
621 *r_ret = r;
622 return 0;
623 }
624
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 struct dlm_rsb **r_ret)
627 {
628 char key[DLM_RESNAME_MAXLEN] = {};
629
630 memcpy(key, name, len);
631 *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 if (*r_ret)
633 return 0;
634
635 return -EBADR;
636 }
637
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 int rv;
641
642 rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 dlm_rhash_rsb_params);
644 if (!rv)
645 rsb_set_flag(rsb, RSB_HASHED);
646
647 return rv;
648 }
649
650 /*
651 * Find rsb in rsbtbl and potentially create/add one
652 *
653 * Delaying the release of rsb's has a similar benefit to applications keeping
654 * NL locks on an rsb, but without the guarantee that the cached master value
655 * will still be valid when the rsb is reused. Apps aren't always smart enough
656 * to keep NL locks on an rsb that they may lock again shortly; this can lead
657 * to excessive master lookups and removals if we don't delay the release.
658 *
659 * Searching for an rsb means looking through both the normal list and toss
660 * list. When found on the toss list the rsb is moved to the normal list with
661 * ref count of 1; when found on normal list the ref count is incremented.
662 *
663 * rsb's on the keep list are being used locally and refcounted.
664 * rsb's on the toss list are not being used locally, and are not refcounted.
665 *
666 * The toss list rsb's were either
667 * - previously used locally but not any more (were on keep list, then
668 * moved to toss list when last refcount dropped)
669 * - created and put on toss list as a directory record for a lookup
670 * (we are the dir node for the res, but are not using the res right now,
671 * but some other node is)
672 *
673 * The purpose of find_rsb() is to return a refcounted rsb for local use.
674 * So, if the given rsb is on the toss list, it is moved to the keep list
675 * before being returned.
676 *
677 * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678 * more refcounts exist, so the rsb is moved from the keep list to the
679 * toss list.
680 *
681 * rsb's on both keep and toss lists are used for doing a name to master
682 * lookups. rsb's that are in use locally (and being refcounted) are on
683 * the keep list, rsb's that are not in use locally (not refcounted) and
684 * only exist for name/master lookups are on the toss list.
685 *
686 * rsb's on the toss list who's dir_nodeid is not local can have stale
687 * name/master mappings. So, remote requests on such rsb's can potentially
688 * return with an error, which means the mapping is stale and needs to
689 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
690 * first_lkid is to keep only a single outstanding request on an rsb
691 * while that rsb has a potentially stale master.)
692 */
693
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 uint32_t hash, int dir_nodeid, int from_nodeid,
696 unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 struct dlm_rsb *r = NULL;
699 int our_nodeid = dlm_our_nodeid();
700 int from_local = 0;
701 int from_other = 0;
702 int from_dir = 0;
703 int create = 0;
704 int error;
705
706 if (flags & R_RECEIVE_REQUEST) {
707 if (from_nodeid == dir_nodeid)
708 from_dir = 1;
709 else
710 from_other = 1;
711 } else if (flags & R_REQUEST) {
712 from_local = 1;
713 }
714
715 /*
716 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 * we're the new master. Our local recovery may not have set
719 * res_master_nodeid to our_nodeid yet, so allow either. Don't
720 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 * by resending.
722 *
723 * If someone sends us a request, we are the dir node, and we do
724 * not find the rsb anywhere, then recreate it. This happens if
725 * someone sends us a request after we have removed/freed an rsb.
726 * (They sent a request instead of lookup because they are using
727 * an rsb taken from their scan list.)
728 */
729
730 if (from_local || from_dir ||
731 (from_other && (dir_nodeid == our_nodeid))) {
732 create = 1;
733 }
734
735 retry:
736 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 if (error)
738 goto do_new;
739
740 /* check if the rsb is active under read lock - likely path */
741 read_lock_bh(&ls->ls_rsbtbl_lock);
742 if (!rsb_flag(r, RSB_HASHED)) {
743 read_unlock_bh(&ls->ls_rsbtbl_lock);
744 error = -EBADR;
745 goto do_new;
746 }
747
748 /*
749 * rsb is active, so we can't check master_nodeid without lock_rsb.
750 */
751
752 if (rsb_flag(r, RSB_INACTIVE)) {
753 read_unlock_bh(&ls->ls_rsbtbl_lock);
754 goto do_inactive;
755 }
756
757 kref_get(&r->res_ref);
758 read_unlock_bh(&ls->ls_rsbtbl_lock);
759 goto out;
760
761
762 do_inactive:
763 write_lock_bh(&ls->ls_rsbtbl_lock);
764
765 /*
766 * The expectation here is that the rsb will have HASHED and
767 * INACTIVE flags set, and that the rsb can be moved from
768 * inactive back to active again. However, between releasing
769 * the read lock and acquiring the write lock, this rsb could
770 * have been removed from rsbtbl, and had HASHED cleared, to
771 * be freed. To deal with this case, we would normally need
772 * to repeat dlm_search_rsb_tree while holding the write lock,
773 * but rcu allows us to simply check the HASHED flag, because
774 * the rcu read lock means the rsb will not be freed yet.
775 * If the HASHED flag is not set, then the rsb is being freed,
776 * so we add a new rsb struct. If the HASHED flag is set,
777 * and INACTIVE is not set, it means another thread has
778 * made the rsb active, as we're expecting to do here, and
779 * we just repeat the lookup (this will be very unlikely.)
780 */
781 if (rsb_flag(r, RSB_HASHED)) {
782 if (!rsb_flag(r, RSB_INACTIVE)) {
783 write_unlock_bh(&ls->ls_rsbtbl_lock);
784 goto retry;
785 }
786 } else {
787 write_unlock_bh(&ls->ls_rsbtbl_lock);
788 error = -EBADR;
789 goto do_new;
790 }
791
792 /*
793 * rsb found inactive (master_nodeid may be out of date unless
794 * we are the dir_nodeid or were the master) No other thread
795 * is using this rsb because it's inactive, so we can
796 * look at or update res_master_nodeid without lock_rsb.
797 */
798
799 if ((r->res_master_nodeid != our_nodeid) && from_other) {
800 /* our rsb was not master, and another node (not the dir node)
801 has sent us a request */
802 log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
803 from_nodeid, r->res_master_nodeid, dir_nodeid,
804 r->res_name);
805 write_unlock_bh(&ls->ls_rsbtbl_lock);
806 error = -ENOTBLK;
807 goto out;
808 }
809
810 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
811 /* don't think this should ever happen */
812 log_error(ls, "find_rsb inactive from_dir %d master %d",
813 from_nodeid, r->res_master_nodeid);
814 dlm_print_rsb(r);
815 /* fix it and go on */
816 r->res_master_nodeid = our_nodeid;
817 r->res_nodeid = 0;
818 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
819 r->res_first_lkid = 0;
820 }
821
822 if (from_local && (r->res_master_nodeid != our_nodeid)) {
823 /* Because we have held no locks on this rsb,
824 res_master_nodeid could have become stale. */
825 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
826 r->res_first_lkid = 0;
827 }
828
829 /* we always deactivate scan timer for the rsb, when
830 * we move it out of the inactive state as rsb state
831 * can be changed and scan timers are only for inactive
832 * rsbs.
833 */
834 del_scan(ls, r);
835 list_move(&r->res_slow_list, &ls->ls_slow_active);
836 rsb_clear_flag(r, RSB_INACTIVE);
837 kref_init(&r->res_ref); /* ref is now used in active state */
838 write_unlock_bh(&ls->ls_rsbtbl_lock);
839
840 goto out;
841
842
843 do_new:
844 /*
845 * rsb not found
846 */
847
848 if (error == -EBADR && !create)
849 goto out;
850
851 error = get_rsb_struct(ls, name, len, &r);
852 if (WARN_ON_ONCE(error))
853 goto out;
854
855 r->res_hash = hash;
856 r->res_dir_nodeid = dir_nodeid;
857 kref_init(&r->res_ref);
858
859 if (from_dir) {
860 /* want to see how often this happens */
861 log_debug(ls, "find_rsb new from_dir %d recreate %s",
862 from_nodeid, r->res_name);
863 r->res_master_nodeid = our_nodeid;
864 r->res_nodeid = 0;
865 goto out_add;
866 }
867
868 if (from_other && (dir_nodeid != our_nodeid)) {
869 /* should never happen */
870 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
871 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
872 dlm_free_rsb(r);
873 r = NULL;
874 error = -ENOTBLK;
875 goto out;
876 }
877
878 if (from_other) {
879 log_debug(ls, "find_rsb new from_other %d dir %d %s",
880 from_nodeid, dir_nodeid, r->res_name);
881 }
882
883 if (dir_nodeid == our_nodeid) {
884 /* When we are the dir nodeid, we can set the master
885 node immediately */
886 r->res_master_nodeid = our_nodeid;
887 r->res_nodeid = 0;
888 } else {
889 /* set_master will send_lookup to dir_nodeid */
890 r->res_master_nodeid = 0;
891 r->res_nodeid = -1;
892 }
893
894 out_add:
895
896 write_lock_bh(&ls->ls_rsbtbl_lock);
897 error = rsb_insert(r, &ls->ls_rsbtbl);
898 if (error == -EEXIST) {
899 /* somebody else was faster and it seems the
900 * rsb exists now, we do a whole relookup
901 */
902 write_unlock_bh(&ls->ls_rsbtbl_lock);
903 dlm_free_rsb(r);
904 goto retry;
905 } else if (!error) {
906 list_add(&r->res_slow_list, &ls->ls_slow_active);
907 }
908 write_unlock_bh(&ls->ls_rsbtbl_lock);
909 out:
910 *r_ret = r;
911 return error;
912 }
913
914 /* During recovery, other nodes can send us new MSTCPY locks (from
915 dlm_recover_locks) before we've made ourself master (in
916 dlm_recover_masters). */
917
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)918 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
919 uint32_t hash, int dir_nodeid, int from_nodeid,
920 unsigned int flags, struct dlm_rsb **r_ret)
921 {
922 struct dlm_rsb *r = NULL;
923 int our_nodeid = dlm_our_nodeid();
924 int recover = (flags & R_RECEIVE_RECOVER);
925 int error;
926
927 retry:
928 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
929 if (error)
930 goto do_new;
931
932 /* check if the rsb is in active state under read lock - likely path */
933 read_lock_bh(&ls->ls_rsbtbl_lock);
934 if (!rsb_flag(r, RSB_HASHED)) {
935 read_unlock_bh(&ls->ls_rsbtbl_lock);
936 goto do_new;
937 }
938
939 if (rsb_flag(r, RSB_INACTIVE)) {
940 read_unlock_bh(&ls->ls_rsbtbl_lock);
941 goto do_inactive;
942 }
943
944 /*
945 * rsb is active, so we can't check master_nodeid without lock_rsb.
946 */
947
948 kref_get(&r->res_ref);
949 read_unlock_bh(&ls->ls_rsbtbl_lock);
950
951 goto out;
952
953
954 do_inactive:
955 write_lock_bh(&ls->ls_rsbtbl_lock);
956
957 /* See comment in find_rsb_dir. */
958 if (rsb_flag(r, RSB_HASHED)) {
959 if (!rsb_flag(r, RSB_INACTIVE)) {
960 write_unlock_bh(&ls->ls_rsbtbl_lock);
961 goto retry;
962 }
963 } else {
964 write_unlock_bh(&ls->ls_rsbtbl_lock);
965 goto do_new;
966 }
967
968
969 /*
970 * rsb found inactive. No other thread is using this rsb because
971 * it's inactive, so we can look at or update res_master_nodeid
972 * without lock_rsb.
973 */
974
975 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
976 /* our rsb is not master, and another node has sent us a
977 request; this should never happen */
978 log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
979 from_nodeid, r->res_master_nodeid, dir_nodeid);
980 dlm_print_rsb(r);
981 write_unlock_bh(&ls->ls_rsbtbl_lock);
982 error = -ENOTBLK;
983 goto out;
984 }
985
986 if (!recover && (r->res_master_nodeid != our_nodeid) &&
987 (dir_nodeid == our_nodeid)) {
988 /* our rsb is not master, and we are dir; may as well fix it;
989 this should never happen */
990 log_error(ls, "find_rsb inactive our %d master %d dir %d",
991 our_nodeid, r->res_master_nodeid, dir_nodeid);
992 dlm_print_rsb(r);
993 r->res_master_nodeid = our_nodeid;
994 r->res_nodeid = 0;
995 }
996
997 del_scan(ls, r);
998 list_move(&r->res_slow_list, &ls->ls_slow_active);
999 rsb_clear_flag(r, RSB_INACTIVE);
1000 kref_init(&r->res_ref);
1001 write_unlock_bh(&ls->ls_rsbtbl_lock);
1002
1003 goto out;
1004
1005
1006 do_new:
1007 /*
1008 * rsb not found
1009 */
1010
1011 error = get_rsb_struct(ls, name, len, &r);
1012 if (WARN_ON_ONCE(error))
1013 goto out;
1014
1015 r->res_hash = hash;
1016 r->res_dir_nodeid = dir_nodeid;
1017 r->res_master_nodeid = dir_nodeid;
1018 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1019 kref_init(&r->res_ref);
1020
1021 write_lock_bh(&ls->ls_rsbtbl_lock);
1022 error = rsb_insert(r, &ls->ls_rsbtbl);
1023 if (error == -EEXIST) {
1024 /* somebody else was faster and it seems the
1025 * rsb exists now, we do a whole relookup
1026 */
1027 write_unlock_bh(&ls->ls_rsbtbl_lock);
1028 dlm_free_rsb(r);
1029 goto retry;
1030 } else if (!error) {
1031 list_add(&r->res_slow_list, &ls->ls_slow_active);
1032 }
1033 write_unlock_bh(&ls->ls_rsbtbl_lock);
1034
1035 out:
1036 *r_ret = r;
1037 return error;
1038 }
1039
1040 /*
1041 * rsb rcu usage
1042 *
1043 * While rcu read lock is held, the rsb cannot be freed,
1044 * which allows a lookup optimization.
1045 *
1046 * Two threads are accessing the same rsb concurrently,
1047 * the first (A) is trying to use the rsb, the second (B)
1048 * is trying to free the rsb.
1049 *
1050 * thread A thread B
1051 * (trying to use rsb) (trying to free rsb)
1052 *
1053 * A1. rcu read lock
1054 * A2. rsbtbl read lock
1055 * A3. look up rsb in rsbtbl
1056 * A4. rsbtbl read unlock
1057 * B1. rsbtbl write lock
1058 * B2. look up rsb in rsbtbl
1059 * B3. remove rsb from rsbtbl
1060 * B4. clear rsb HASHED flag
1061 * B5. rsbtbl write unlock
1062 * B6. begin freeing rsb using rcu...
1063 *
1064 * (rsb is inactive, so try to make it active again)
1065 * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1066 * A6. the rsb HASHED flag is not set, which it means the rsb
1067 * is being removed from rsbtbl and freed, so don't use it.
1068 * A7. rcu read unlock
1069 *
1070 * B7. ...finish freeing rsb using rcu
1071 * A8. create a new rsb
1072 *
1073 * Without the rcu optimization, steps A5-8 would need to do
1074 * an extra rsbtbl lookup:
1075 * A5. rsbtbl write lock
1076 * A6. look up rsb in rsbtbl, not found
1077 * A7. rsbtbl write unlock
1078 * A8. create a new rsb
1079 */
1080
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1081 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1082 int from_nodeid, unsigned int flags,
1083 struct dlm_rsb **r_ret)
1084 {
1085 int dir_nodeid;
1086 uint32_t hash;
1087 int rv;
1088
1089 if (len > DLM_RESNAME_MAXLEN)
1090 return -EINVAL;
1091
1092 hash = jhash(name, len, 0);
1093 dir_nodeid = dlm_hash2nodeid(ls, hash);
1094
1095 rcu_read_lock();
1096 if (dlm_no_directory(ls))
1097 rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1098 from_nodeid, flags, r_ret);
1099 else
1100 rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1101 from_nodeid, flags, r_ret);
1102 rcu_read_unlock();
1103 return rv;
1104 }
1105
1106 /* we have received a request and found that res_master_nodeid != our_nodeid,
1107 so we need to return an error or make ourself the master */
1108
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1109 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1110 int from_nodeid)
1111 {
1112 if (dlm_no_directory(ls)) {
1113 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1114 from_nodeid, r->res_master_nodeid,
1115 r->res_dir_nodeid);
1116 dlm_print_rsb(r);
1117 return -ENOTBLK;
1118 }
1119
1120 if (from_nodeid != r->res_dir_nodeid) {
1121 /* our rsb is not master, and another node (not the dir node)
1122 has sent us a request. this is much more common when our
1123 master_nodeid is zero, so limit debug to non-zero. */
1124
1125 if (r->res_master_nodeid) {
1126 log_debug(ls, "validate master from_other %d master %d "
1127 "dir %d first %x %s", from_nodeid,
1128 r->res_master_nodeid, r->res_dir_nodeid,
1129 r->res_first_lkid, r->res_name);
1130 }
1131 return -ENOTBLK;
1132 } else {
1133 /* our rsb is not master, but the dir nodeid has sent us a
1134 request; this could happen with master 0 / res_nodeid -1 */
1135
1136 if (r->res_master_nodeid) {
1137 log_error(ls, "validate master from_dir %d master %d "
1138 "first %x %s",
1139 from_nodeid, r->res_master_nodeid,
1140 r->res_first_lkid, r->res_name);
1141 }
1142
1143 r->res_master_nodeid = dlm_our_nodeid();
1144 r->res_nodeid = 0;
1145 return 0;
1146 }
1147 }
1148
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1149 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1150 int from_nodeid, bool is_inactive, unsigned int flags,
1151 int *r_nodeid, int *result)
1152 {
1153 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1154 int from_master = (flags & DLM_LU_RECOVER_DIR);
1155
1156 if (r->res_dir_nodeid != our_nodeid) {
1157 /* should not happen, but may as well fix it and carry on */
1158 log_error(ls, "%s res_dir %d our %d %s", __func__,
1159 r->res_dir_nodeid, our_nodeid, r->res_name);
1160 r->res_dir_nodeid = our_nodeid;
1161 }
1162
1163 if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1164 /* Recovery uses this function to set a new master when
1165 * the previous master failed. Setting NEW_MASTER will
1166 * force dlm_recover_masters to call recover_master on this
1167 * rsb even though the res_nodeid is no longer removed.
1168 */
1169
1170 r->res_master_nodeid = from_nodeid;
1171 r->res_nodeid = from_nodeid;
1172 rsb_set_flag(r, RSB_NEW_MASTER);
1173
1174 if (is_inactive) {
1175 /* I don't think we should ever find it inactive. */
1176 log_error(ls, "%s fix_master inactive", __func__);
1177 dlm_dump_rsb(r);
1178 }
1179 }
1180
1181 if (from_master && (r->res_master_nodeid != from_nodeid)) {
1182 /* this will happen if from_nodeid became master during
1183 * a previous recovery cycle, and we aborted the previous
1184 * cycle before recovering this master value
1185 */
1186
1187 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1188 __func__, from_nodeid, r->res_master_nodeid,
1189 r->res_nodeid, r->res_first_lkid, r->res_name);
1190
1191 if (r->res_master_nodeid == our_nodeid) {
1192 log_error(ls, "from_master %d our_master", from_nodeid);
1193 dlm_dump_rsb(r);
1194 goto ret_assign;
1195 }
1196
1197 r->res_master_nodeid = from_nodeid;
1198 r->res_nodeid = from_nodeid;
1199 rsb_set_flag(r, RSB_NEW_MASTER);
1200 }
1201
1202 if (!r->res_master_nodeid) {
1203 /* this will happen if recovery happens while we're looking
1204 * up the master for this rsb
1205 */
1206
1207 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1208 from_nodeid, r->res_first_lkid, r->res_name);
1209 r->res_master_nodeid = from_nodeid;
1210 r->res_nodeid = from_nodeid;
1211 }
1212
1213 if (!from_master && !fix_master &&
1214 (r->res_master_nodeid == from_nodeid)) {
1215 /* this can happen when the master sends remove, the dir node
1216 * finds the rsb on the active list and ignores the remove,
1217 * and the former master sends a lookup
1218 */
1219
1220 log_limit(ls, "%s from master %d flags %x first %x %s",
1221 __func__, from_nodeid, flags, r->res_first_lkid,
1222 r->res_name);
1223 }
1224
1225 ret_assign:
1226 *r_nodeid = r->res_master_nodeid;
1227 if (result)
1228 *result = DLM_LU_MATCH;
1229 }
1230
1231 /*
1232 * We're the dir node for this res and another node wants to know the
1233 * master nodeid. During normal operation (non recovery) this is only
1234 * called from receive_lookup(); master lookups when the local node is
1235 * the dir node are done by find_rsb().
1236 *
1237 * normal operation, we are the dir node for a resource
1238 * . _request_lock
1239 * . set_master
1240 * . send_lookup
1241 * . receive_lookup
1242 * . dlm_master_lookup flags 0
1243 *
1244 * recover directory, we are rebuilding dir for all resources
1245 * . dlm_recover_directory
1246 * . dlm_rcom_names
1247 * remote node sends back the rsb names it is master of and we are dir of
1248 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1249 * we either create new rsb setting remote node as master, or find existing
1250 * rsb and set master to be the remote node.
1251 *
1252 * recover masters, we are finding the new master for resources
1253 * . dlm_recover_masters
1254 * . recover_master
1255 * . dlm_send_rcom_lookup
1256 * . receive_rcom_lookup
1257 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1258 */
1259
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1260 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1261 int len, unsigned int flags, int *r_nodeid, int *result)
1262 {
1263 struct dlm_rsb *r = NULL;
1264 uint32_t hash;
1265 int our_nodeid = dlm_our_nodeid();
1266 int dir_nodeid, error;
1267
1268 if (len > DLM_RESNAME_MAXLEN)
1269 return -EINVAL;
1270
1271 if (from_nodeid == our_nodeid) {
1272 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1273 our_nodeid, flags);
1274 return -EINVAL;
1275 }
1276
1277 hash = jhash(name, len, 0);
1278 dir_nodeid = dlm_hash2nodeid(ls, hash);
1279 if (dir_nodeid != our_nodeid) {
1280 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1281 from_nodeid, dir_nodeid, our_nodeid, hash,
1282 ls->ls_num_nodes);
1283 *r_nodeid = -1;
1284 return -EINVAL;
1285 }
1286
1287 retry:
1288 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1289 if (error)
1290 goto not_found;
1291
1292 /* check if the rsb is active under read lock - likely path */
1293 read_lock_bh(&ls->ls_rsbtbl_lock);
1294 if (!rsb_flag(r, RSB_HASHED)) {
1295 read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 goto not_found;
1297 }
1298
1299 if (rsb_flag(r, RSB_INACTIVE)) {
1300 read_unlock_bh(&ls->ls_rsbtbl_lock);
1301 goto do_inactive;
1302 }
1303
1304 /* because the rsb is active, we need to lock_rsb before
1305 * checking/changing re_master_nodeid
1306 */
1307
1308 hold_rsb(r);
1309 read_unlock_bh(&ls->ls_rsbtbl_lock);
1310 lock_rsb(r);
1311
1312 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1313 flags, r_nodeid, result);
1314
1315 /* the rsb was active */
1316 unlock_rsb(r);
1317 put_rsb(r);
1318
1319 return 0;
1320
1321 do_inactive:
1322 /* unlikely path - check if still part of ls_rsbtbl */
1323 write_lock_bh(&ls->ls_rsbtbl_lock);
1324
1325 /* see comment in find_rsb_dir */
1326 if (rsb_flag(r, RSB_HASHED)) {
1327 if (!rsb_flag(r, RSB_INACTIVE)) {
1328 write_unlock_bh(&ls->ls_rsbtbl_lock);
1329 /* something as changed, very unlikely but
1330 * try again
1331 */
1332 goto retry;
1333 }
1334 } else {
1335 write_unlock_bh(&ls->ls_rsbtbl_lock);
1336 goto not_found;
1337 }
1338
1339 /* because the rsb is inactive, it's not refcounted and lock_rsb
1340 is not used, but is protected by the rsbtbl lock */
1341
1342 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1343 r_nodeid, result);
1344
1345 /* A dir record rsb should never be on scan list.
1346 * Except when we are the dir and master node.
1347 * This function should only be called by the dir
1348 * node.
1349 */
1350 WARN_ON(!list_empty(&r->res_scan_list) &&
1351 r->res_master_nodeid != our_nodeid);
1352
1353 write_unlock_bh(&ls->ls_rsbtbl_lock);
1354
1355 return 0;
1356
1357 not_found:
1358 error = get_rsb_struct(ls, name, len, &r);
1359 if (WARN_ON_ONCE(error))
1360 goto out;
1361
1362 r->res_hash = hash;
1363 r->res_dir_nodeid = our_nodeid;
1364 r->res_master_nodeid = from_nodeid;
1365 r->res_nodeid = from_nodeid;
1366 rsb_set_flag(r, RSB_INACTIVE);
1367
1368 write_lock_bh(&ls->ls_rsbtbl_lock);
1369 error = rsb_insert(r, &ls->ls_rsbtbl);
1370 if (error == -EEXIST) {
1371 /* somebody else was faster and it seems the
1372 * rsb exists now, we do a whole relookup
1373 */
1374 write_unlock_bh(&ls->ls_rsbtbl_lock);
1375 dlm_free_rsb(r);
1376 goto retry;
1377 } else if (error) {
1378 write_unlock_bh(&ls->ls_rsbtbl_lock);
1379 /* should never happen */
1380 dlm_free_rsb(r);
1381 goto retry;
1382 }
1383
1384 list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1385 write_unlock_bh(&ls->ls_rsbtbl_lock);
1386
1387 if (result)
1388 *result = DLM_LU_ADD;
1389 *r_nodeid = from_nodeid;
1390 out:
1391 return error;
1392 }
1393
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1394 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1395 int len, unsigned int flags, int *r_nodeid, int *result)
1396 {
1397 int rv;
1398 rcu_read_lock();
1399 rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1400 rcu_read_unlock();
1401 return rv;
1402 }
1403
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1404 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1405 {
1406 struct dlm_rsb *r;
1407
1408 read_lock_bh(&ls->ls_rsbtbl_lock);
1409 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1410 if (r->res_hash == hash)
1411 dlm_dump_rsb(r);
1412 }
1413 read_unlock_bh(&ls->ls_rsbtbl_lock);
1414 }
1415
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1416 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1417 {
1418 struct dlm_rsb *r = NULL;
1419 int error;
1420
1421 rcu_read_lock();
1422 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1423 if (!error)
1424 goto out;
1425
1426 dlm_dump_rsb(r);
1427 out:
1428 rcu_read_unlock();
1429 }
1430
deactivate_rsb(struct kref * kref)1431 static void deactivate_rsb(struct kref *kref)
1432 {
1433 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1434 struct dlm_ls *ls = r->res_ls;
1435 int our_nodeid = dlm_our_nodeid();
1436
1437 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1438 rsb_set_flag(r, RSB_INACTIVE);
1439 list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1440
1441 /*
1442 * When the rsb becomes unused, there are two possibilities:
1443 * 1. Leave the inactive rsb in place (don't remove it).
1444 * 2. Add it to the scan list to be removed.
1445 *
1446 * 1 is done when the rsb is acting as the dir record
1447 * for a remotely mastered rsb. The rsb must be left
1448 * in place as an inactive rsb to act as the dir record.
1449 *
1450 * 2 is done when a) the rsb is not the master and not the
1451 * dir record, b) when the rsb is both the master and the
1452 * dir record, c) when the rsb is master but not dir record.
1453 *
1454 * (If no directory is used, the rsb can always be removed.)
1455 */
1456 if (dlm_no_directory(ls) ||
1457 (r->res_master_nodeid == our_nodeid ||
1458 dlm_dir_nodeid(r) != our_nodeid))
1459 add_scan(ls, r);
1460
1461 if (r->res_lvbptr) {
1462 dlm_free_lvb(r->res_lvbptr);
1463 r->res_lvbptr = NULL;
1464 }
1465 }
1466
free_inactive_rsb(struct dlm_rsb * r)1467 void free_inactive_rsb(struct dlm_rsb *r)
1468 {
1469 WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1470
1471 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1472 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1473 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1474 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1475 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1476 DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1477 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1478 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1479
1480 dlm_free_rsb(r);
1481 }
1482
1483 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1484 The rsb must exist as long as any lkb's for it do. */
1485
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1486 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1487 {
1488 hold_rsb(r);
1489 lkb->lkb_resource = r;
1490 }
1491
detach_lkb(struct dlm_lkb * lkb)1492 static void detach_lkb(struct dlm_lkb *lkb)
1493 {
1494 if (lkb->lkb_resource) {
1495 put_rsb(lkb->lkb_resource);
1496 lkb->lkb_resource = NULL;
1497 }
1498 }
1499
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1500 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1501 unsigned long start, unsigned long end)
1502 {
1503 struct xa_limit limit;
1504 struct dlm_lkb *lkb;
1505 int rv;
1506
1507 limit.max = end;
1508 limit.min = start;
1509
1510 lkb = dlm_allocate_lkb();
1511 if (!lkb)
1512 return -ENOMEM;
1513
1514 lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1515 lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1516 lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1517 lkb->lkb_nodeid = -1;
1518 lkb->lkb_grmode = DLM_LOCK_IV;
1519 kref_init(&lkb->lkb_ref);
1520 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1521 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1522
1523 write_lock_bh(&ls->ls_lkbxa_lock);
1524 rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1525 write_unlock_bh(&ls->ls_lkbxa_lock);
1526
1527 if (rv < 0) {
1528 log_error(ls, "create_lkb xa error %d", rv);
1529 dlm_free_lkb(lkb);
1530 return rv;
1531 }
1532
1533 *lkb_ret = lkb;
1534 return 0;
1535 }
1536
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1537 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1538 {
1539 return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1540 }
1541
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1542 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1543 {
1544 struct dlm_lkb *lkb;
1545
1546 rcu_read_lock();
1547 lkb = xa_load(&ls->ls_lkbxa, lkid);
1548 if (lkb) {
1549 /* check if lkb is still part of lkbxa under lkbxa_lock as
1550 * the lkb_ref is tight to the lkbxa data structure, see
1551 * __put_lkb().
1552 */
1553 read_lock_bh(&ls->ls_lkbxa_lock);
1554 if (kref_read(&lkb->lkb_ref))
1555 kref_get(&lkb->lkb_ref);
1556 else
1557 lkb = NULL;
1558 read_unlock_bh(&ls->ls_lkbxa_lock);
1559 }
1560 rcu_read_unlock();
1561
1562 *lkb_ret = lkb;
1563 return lkb ? 0 : -ENOENT;
1564 }
1565
kill_lkb(struct kref * kref)1566 static void kill_lkb(struct kref *kref)
1567 {
1568 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1569
1570 /* All work is done after the return from kref_put() so we
1571 can release the write_lock before the detach_lkb */
1572
1573 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1574 }
1575
1576 /* __put_lkb() is used when an lkb may not have an rsb attached to
1577 it so we need to provide the lockspace explicitly */
1578
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1579 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1580 {
1581 uint32_t lkid = lkb->lkb_id;
1582 int rv;
1583
1584 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1585 &ls->ls_lkbxa_lock);
1586 if (rv) {
1587 xa_erase(&ls->ls_lkbxa, lkid);
1588 write_unlock_bh(&ls->ls_lkbxa_lock);
1589
1590 detach_lkb(lkb);
1591
1592 /* for local/process lkbs, lvbptr points to caller's lksb */
1593 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1594 dlm_free_lvb(lkb->lkb_lvbptr);
1595 dlm_free_lkb(lkb);
1596 }
1597
1598 return rv;
1599 }
1600
dlm_put_lkb(struct dlm_lkb * lkb)1601 int dlm_put_lkb(struct dlm_lkb *lkb)
1602 {
1603 struct dlm_ls *ls;
1604
1605 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1606 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1607
1608 ls = lkb->lkb_resource->res_ls;
1609 return __put_lkb(ls, lkb);
1610 }
1611
1612 /* This is only called to add a reference when the code already holds
1613 a valid reference to the lkb, so there's no need for locking. */
1614
hold_lkb(struct dlm_lkb * lkb)1615 static inline void hold_lkb(struct dlm_lkb *lkb)
1616 {
1617 kref_get(&lkb->lkb_ref);
1618 }
1619
unhold_lkb_assert(struct kref * kref)1620 static void unhold_lkb_assert(struct kref *kref)
1621 {
1622 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1623
1624 DLM_ASSERT(false, dlm_print_lkb(lkb););
1625 }
1626
1627 /* This is called when we need to remove a reference and are certain
1628 it's not the last ref. e.g. del_lkb is always called between a
1629 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1630 put_lkb would work fine, but would involve unnecessary locking */
1631
unhold_lkb(struct dlm_lkb * lkb)1632 static inline void unhold_lkb(struct dlm_lkb *lkb)
1633 {
1634 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1635 }
1636
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1637 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1638 int mode)
1639 {
1640 struct dlm_lkb *lkb = NULL, *iter;
1641
1642 list_for_each_entry(iter, head, lkb_statequeue)
1643 if (iter->lkb_rqmode < mode) {
1644 lkb = iter;
1645 list_add_tail(new, &iter->lkb_statequeue);
1646 break;
1647 }
1648
1649 if (!lkb)
1650 list_add_tail(new, head);
1651 }
1652
1653 /* add/remove lkb to rsb's grant/convert/wait queue */
1654
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1655 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1656 {
1657 kref_get(&lkb->lkb_ref);
1658
1659 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1660
1661 lkb->lkb_timestamp = ktime_get();
1662
1663 lkb->lkb_status = status;
1664
1665 switch (status) {
1666 case DLM_LKSTS_WAITING:
1667 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1668 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1669 else
1670 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1671 break;
1672 case DLM_LKSTS_GRANTED:
1673 /* convention says granted locks kept in order of grmode */
1674 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1675 lkb->lkb_grmode);
1676 break;
1677 case DLM_LKSTS_CONVERT:
1678 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1679 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1680 else
1681 list_add_tail(&lkb->lkb_statequeue,
1682 &r->res_convertqueue);
1683 break;
1684 default:
1685 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1686 }
1687 }
1688
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1689 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1690 {
1691 lkb->lkb_status = 0;
1692 list_del(&lkb->lkb_statequeue);
1693 unhold_lkb(lkb);
1694 }
1695
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1696 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1697 {
1698 del_lkb(r, lkb);
1699 add_lkb(r, lkb, sts);
1700 }
1701
msg_reply_type(int mstype)1702 static int msg_reply_type(int mstype)
1703 {
1704 switch (mstype) {
1705 case DLM_MSG_REQUEST:
1706 return DLM_MSG_REQUEST_REPLY;
1707 case DLM_MSG_CONVERT:
1708 return DLM_MSG_CONVERT_REPLY;
1709 case DLM_MSG_UNLOCK:
1710 return DLM_MSG_UNLOCK_REPLY;
1711 case DLM_MSG_CANCEL:
1712 return DLM_MSG_CANCEL_REPLY;
1713 case DLM_MSG_LOOKUP:
1714 return DLM_MSG_LOOKUP_REPLY;
1715 }
1716 return -1;
1717 }
1718
1719 /* add/remove lkb from global waiters list of lkb's waiting for
1720 a reply from a remote node */
1721
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1722 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1723 {
1724 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1725
1726 spin_lock_bh(&ls->ls_waiters_lock);
1727 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1728 switch (mstype) {
1729 case DLM_MSG_UNLOCK:
1730 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1731 break;
1732 case DLM_MSG_CANCEL:
1733 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1734 break;
1735 default:
1736 /* should never happen as validate_lock_args() checks
1737 * on lkb_wait_type and validate_unlock_args() only
1738 * creates UNLOCK or CANCEL messages.
1739 */
1740 WARN_ON_ONCE(1);
1741 goto out;
1742 }
1743 lkb->lkb_wait_count++;
1744 hold_lkb(lkb);
1745
1746 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1747 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1748 lkb->lkb_wait_count, dlm_iflags_val(lkb));
1749 goto out;
1750 }
1751
1752 DLM_ASSERT(!lkb->lkb_wait_count,
1753 dlm_print_lkb(lkb);
1754 printk("wait_count %d\n", lkb->lkb_wait_count););
1755
1756 lkb->lkb_wait_count++;
1757 lkb->lkb_wait_type = mstype;
1758 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1759 hold_lkb(lkb);
1760 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1761 out:
1762 spin_unlock_bh(&ls->ls_waiters_lock);
1763 }
1764
1765 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1766 list as part of process_requestqueue (e.g. a lookup that has an optimized
1767 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1768 set RESEND and dlm_recover_waiters_post() */
1769
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1770 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1771 const struct dlm_message *ms)
1772 {
1773 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1774 int overlap_done = 0;
1775
1776 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1777 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1778 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1779 overlap_done = 1;
1780 goto out_del;
1781 }
1782
1783 if (mstype == DLM_MSG_CANCEL_REPLY &&
1784 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1785 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1786 overlap_done = 1;
1787 goto out_del;
1788 }
1789
1790 /* Cancel state was preemptively cleared by a successful convert,
1791 see next comment, nothing to do. */
1792
1793 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1794 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1795 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1796 lkb->lkb_id, lkb->lkb_wait_type);
1797 return -1;
1798 }
1799
1800 /* Remove for the convert reply, and premptively remove for the
1801 cancel reply. A convert has been granted while there's still
1802 an outstanding cancel on it (the cancel is moot and the result
1803 in the cancel reply should be 0). We preempt the cancel reply
1804 because the app gets the convert result and then can follow up
1805 with another op, like convert. This subsequent op would see the
1806 lingering state of the cancel and fail with -EBUSY. */
1807
1808 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1809 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1810 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1811 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1812 lkb->lkb_id);
1813 lkb->lkb_wait_type = 0;
1814 lkb->lkb_wait_count--;
1815 unhold_lkb(lkb);
1816 goto out_del;
1817 }
1818
1819 /* N.B. type of reply may not always correspond to type of original
1820 msg due to lookup->request optimization, verify others? */
1821
1822 if (lkb->lkb_wait_type) {
1823 lkb->lkb_wait_type = 0;
1824 goto out_del;
1825 }
1826
1827 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1828 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1829 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1830 return -1;
1831
1832 out_del:
1833 /* the force-unlock/cancel has completed and we haven't recvd a reply
1834 to the op that was in progress prior to the unlock/cancel; we
1835 give up on any reply to the earlier op. FIXME: not sure when/how
1836 this would happen */
1837
1838 if (overlap_done && lkb->lkb_wait_type) {
1839 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1840 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1841 lkb->lkb_wait_count--;
1842 unhold_lkb(lkb);
1843 lkb->lkb_wait_type = 0;
1844 }
1845
1846 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1847
1848 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1849 lkb->lkb_wait_count--;
1850 if (!lkb->lkb_wait_count)
1851 list_del_init(&lkb->lkb_wait_reply);
1852 unhold_lkb(lkb);
1853 return 0;
1854 }
1855
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1856 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1857 {
1858 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1859 int error;
1860
1861 spin_lock_bh(&ls->ls_waiters_lock);
1862 error = _remove_from_waiters(lkb, mstype, NULL);
1863 spin_unlock_bh(&ls->ls_waiters_lock);
1864 return error;
1865 }
1866
1867 /* Handles situations where we might be processing a "fake" or "local" reply in
1868 * the recovery context which stops any locking activity. Only debugfs might
1869 * change the lockspace waiters but they will held the recovery lock to ensure
1870 * remove_from_waiters_ms() in local case will be the only user manipulating the
1871 * lockspace waiters in recovery context.
1872 */
1873
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1874 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1875 const struct dlm_message *ms, bool local)
1876 {
1877 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1878 int error;
1879
1880 if (!local)
1881 spin_lock_bh(&ls->ls_waiters_lock);
1882 else
1883 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1884 !dlm_locking_stopped(ls));
1885 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1886 if (!local)
1887 spin_unlock_bh(&ls->ls_waiters_lock);
1888 return error;
1889 }
1890
1891 /* lkb is master or local copy */
1892
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1893 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1894 {
1895 int b, len = r->res_ls->ls_lvblen;
1896
1897 /* b=1 lvb returned to caller
1898 b=0 lvb written to rsb or invalidated
1899 b=-1 do nothing */
1900
1901 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1902
1903 if (b == 1) {
1904 if (!lkb->lkb_lvbptr)
1905 return;
1906
1907 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1908 return;
1909
1910 if (!r->res_lvbptr)
1911 return;
1912
1913 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1914 lkb->lkb_lvbseq = r->res_lvbseq;
1915
1916 } else if (b == 0) {
1917 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1918 rsb_set_flag(r, RSB_VALNOTVALID);
1919 return;
1920 }
1921
1922 if (!lkb->lkb_lvbptr)
1923 return;
1924
1925 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1926 return;
1927
1928 if (!r->res_lvbptr)
1929 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1930
1931 if (!r->res_lvbptr)
1932 return;
1933
1934 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1935 r->res_lvbseq++;
1936 lkb->lkb_lvbseq = r->res_lvbseq;
1937 rsb_clear_flag(r, RSB_VALNOTVALID);
1938 }
1939
1940 if (rsb_flag(r, RSB_VALNOTVALID))
1941 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1942 }
1943
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1944 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1945 {
1946 if (lkb->lkb_grmode < DLM_LOCK_PW)
1947 return;
1948
1949 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1950 rsb_set_flag(r, RSB_VALNOTVALID);
1951 return;
1952 }
1953
1954 if (!lkb->lkb_lvbptr)
1955 return;
1956
1957 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1958 return;
1959
1960 if (!r->res_lvbptr)
1961 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1962
1963 if (!r->res_lvbptr)
1964 return;
1965
1966 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1967 r->res_lvbseq++;
1968 rsb_clear_flag(r, RSB_VALNOTVALID);
1969 }
1970
1971 /* lkb is process copy (pc) */
1972
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1973 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1974 const struct dlm_message *ms)
1975 {
1976 int b;
1977
1978 if (!lkb->lkb_lvbptr)
1979 return;
1980
1981 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1982 return;
1983
1984 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1985 if (b == 1) {
1986 int len = receive_extralen(ms);
1987 if (len > r->res_ls->ls_lvblen)
1988 len = r->res_ls->ls_lvblen;
1989 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1990 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1991 }
1992 }
1993
1994 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1995 remove_lock -- used for unlock, removes lkb from granted
1996 revert_lock -- used for cancel, moves lkb from convert to granted
1997 grant_lock -- used for request and convert, adds lkb to granted or
1998 moves lkb from convert or waiting to granted
1999
2000 Each of these is used for master or local copy lkb's. There is
2001 also a _pc() variation used to make the corresponding change on
2002 a process copy (pc) lkb. */
2003
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2004 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2005 {
2006 del_lkb(r, lkb);
2007 lkb->lkb_grmode = DLM_LOCK_IV;
2008 /* this unhold undoes the original ref from create_lkb()
2009 so this leads to the lkb being freed */
2010 unhold_lkb(lkb);
2011 }
2012
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2013 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2014 {
2015 set_lvb_unlock(r, lkb);
2016 _remove_lock(r, lkb);
2017 }
2018
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2019 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2020 {
2021 _remove_lock(r, lkb);
2022 }
2023
2024 /* returns: 0 did nothing
2025 1 moved lock to granted
2026 -1 removed lock */
2027
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2028 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2029 {
2030 int rv = 0;
2031
2032 lkb->lkb_rqmode = DLM_LOCK_IV;
2033
2034 switch (lkb->lkb_status) {
2035 case DLM_LKSTS_GRANTED:
2036 break;
2037 case DLM_LKSTS_CONVERT:
2038 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2039 rv = 1;
2040 break;
2041 case DLM_LKSTS_WAITING:
2042 del_lkb(r, lkb);
2043 lkb->lkb_grmode = DLM_LOCK_IV;
2044 /* this unhold undoes the original ref from create_lkb()
2045 so this leads to the lkb being freed */
2046 unhold_lkb(lkb);
2047 rv = -1;
2048 break;
2049 default:
2050 log_print("invalid status for revert %d", lkb->lkb_status);
2051 }
2052 return rv;
2053 }
2054
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2055 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2056 {
2057 return revert_lock(r, lkb);
2058 }
2059
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2060 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2061 {
2062 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2063 lkb->lkb_grmode = lkb->lkb_rqmode;
2064 if (lkb->lkb_status)
2065 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2066 else
2067 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2068 }
2069
2070 lkb->lkb_rqmode = DLM_LOCK_IV;
2071 lkb->lkb_highbast = 0;
2072 }
2073
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2074 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2075 {
2076 set_lvb_lock(r, lkb);
2077 _grant_lock(r, lkb);
2078 }
2079
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2080 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2081 const struct dlm_message *ms)
2082 {
2083 set_lvb_lock_pc(r, lkb, ms);
2084 _grant_lock(r, lkb);
2085 }
2086
2087 /* called by grant_pending_locks() which means an async grant message must
2088 be sent to the requesting node in addition to granting the lock if the
2089 lkb belongs to a remote node. */
2090
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2091 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2092 {
2093 grant_lock(r, lkb);
2094 if (is_master_copy(lkb))
2095 send_grant(r, lkb);
2096 else
2097 queue_cast(r, lkb, 0);
2098 }
2099
2100 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2101 change the granted/requested modes. We're munging things accordingly in
2102 the process copy.
2103 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2104 conversion deadlock
2105 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2106 compatible with other granted locks */
2107
munge_demoted(struct dlm_lkb * lkb)2108 static void munge_demoted(struct dlm_lkb *lkb)
2109 {
2110 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2111 log_print("munge_demoted %x invalid modes gr %d rq %d",
2112 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2113 return;
2114 }
2115
2116 lkb->lkb_grmode = DLM_LOCK_NL;
2117 }
2118
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2119 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2120 {
2121 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2122 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2123 log_print("munge_altmode %x invalid reply type %d",
2124 lkb->lkb_id, le32_to_cpu(ms->m_type));
2125 return;
2126 }
2127
2128 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2129 lkb->lkb_rqmode = DLM_LOCK_PR;
2130 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2131 lkb->lkb_rqmode = DLM_LOCK_CW;
2132 else {
2133 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2134 dlm_print_lkb(lkb);
2135 }
2136 }
2137
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2138 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2139 {
2140 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2141 lkb_statequeue);
2142 if (lkb->lkb_id == first->lkb_id)
2143 return 1;
2144
2145 return 0;
2146 }
2147
2148 /* Check if the given lkb conflicts with another lkb on the queue. */
2149
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2150 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2151 {
2152 struct dlm_lkb *this;
2153
2154 list_for_each_entry(this, head, lkb_statequeue) {
2155 if (this == lkb)
2156 continue;
2157 if (!modes_compat(this, lkb))
2158 return 1;
2159 }
2160 return 0;
2161 }
2162
2163 /*
2164 * "A conversion deadlock arises with a pair of lock requests in the converting
2165 * queue for one resource. The granted mode of each lock blocks the requested
2166 * mode of the other lock."
2167 *
2168 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2169 * convert queue from being granted, then deadlk/demote lkb.
2170 *
2171 * Example:
2172 * Granted Queue: empty
2173 * Convert Queue: NL->EX (first lock)
2174 * PR->EX (second lock)
2175 *
2176 * The first lock can't be granted because of the granted mode of the second
2177 * lock and the second lock can't be granted because it's not first in the
2178 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2179 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2180 * flag set and return DEMOTED in the lksb flags.
2181 *
2182 * Originally, this function detected conv-deadlk in a more limited scope:
2183 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2184 * - if lkb1 was the first entry in the queue (not just earlier), and was
2185 * blocked by the granted mode of lkb2, and there was nothing on the
2186 * granted queue preventing lkb1 from being granted immediately, i.e.
2187 * lkb2 was the only thing preventing lkb1 from being granted.
2188 *
2189 * That second condition meant we'd only say there was conv-deadlk if
2190 * resolving it (by demotion) would lead to the first lock on the convert
2191 * queue being granted right away. It allowed conversion deadlocks to exist
2192 * between locks on the convert queue while they couldn't be granted anyway.
2193 *
2194 * Now, we detect and take action on conversion deadlocks immediately when
2195 * they're created, even if they may not be immediately consequential. If
2196 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2197 * mode that would prevent lkb1's conversion from being granted, we do a
2198 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2199 * I think this means that the lkb_is_ahead condition below should always
2200 * be zero, i.e. there will never be conv-deadlk between two locks that are
2201 * both already on the convert queue.
2202 */
2203
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2204 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2205 {
2206 struct dlm_lkb *lkb1;
2207 int lkb_is_ahead = 0;
2208
2209 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2210 if (lkb1 == lkb2) {
2211 lkb_is_ahead = 1;
2212 continue;
2213 }
2214
2215 if (!lkb_is_ahead) {
2216 if (!modes_compat(lkb2, lkb1))
2217 return 1;
2218 } else {
2219 if (!modes_compat(lkb2, lkb1) &&
2220 !modes_compat(lkb1, lkb2))
2221 return 1;
2222 }
2223 }
2224 return 0;
2225 }
2226
2227 /*
2228 * Return 1 if the lock can be granted, 0 otherwise.
2229 * Also detect and resolve conversion deadlocks.
2230 *
2231 * lkb is the lock to be granted
2232 *
2233 * now is 1 if the function is being called in the context of the
2234 * immediate request, it is 0 if called later, after the lock has been
2235 * queued.
2236 *
2237 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2238 * after recovery.
2239 *
2240 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2241 */
2242
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2243 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2244 int recover)
2245 {
2246 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2247
2248 /*
2249 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2250 * a new request for a NL mode lock being blocked.
2251 *
2252 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2253 * request, then it would be granted. In essence, the use of this flag
2254 * tells the Lock Manager to expedite theis request by not considering
2255 * what may be in the CONVERTING or WAITING queues... As of this
2256 * writing, the EXPEDITE flag can be used only with new requests for NL
2257 * mode locks. This flag is not valid for conversion requests.
2258 *
2259 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2260 * conversion or used with a non-NL requested mode. We also know an
2261 * EXPEDITE request is always granted immediately, so now must always
2262 * be 1. The full condition to grant an expedite request: (now &&
2263 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2264 * therefore be shortened to just checking the flag.
2265 */
2266
2267 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2268 return 1;
2269
2270 /*
2271 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2272 * added to the remaining conditions.
2273 */
2274
2275 if (queue_conflict(&r->res_grantqueue, lkb))
2276 return 0;
2277
2278 /*
2279 * 6-3: By default, a conversion request is immediately granted if the
2280 * requested mode is compatible with the modes of all other granted
2281 * locks
2282 */
2283
2284 if (queue_conflict(&r->res_convertqueue, lkb))
2285 return 0;
2286
2287 /*
2288 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2289 * locks for a recovered rsb, on which lkb's have been rebuilt.
2290 * The lkb's may have been rebuilt on the queues in a different
2291 * order than they were in on the previous master. So, granting
2292 * queued conversions in order after recovery doesn't make sense
2293 * since the order hasn't been preserved anyway. The new order
2294 * could also have created a new "in place" conversion deadlock.
2295 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2296 * After recovery, there would be no granted locks, and possibly
2297 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2298 * recovery, grant conversions without considering order.
2299 */
2300
2301 if (conv && recover)
2302 return 1;
2303
2304 /*
2305 * 6-5: But the default algorithm for deciding whether to grant or
2306 * queue conversion requests does not by itself guarantee that such
2307 * requests are serviced on a "first come first serve" basis. This, in
2308 * turn, can lead to a phenomenon known as "indefinate postponement".
2309 *
2310 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2311 * the system service employed to request a lock conversion. This flag
2312 * forces certain conversion requests to be queued, even if they are
2313 * compatible with the granted modes of other locks on the same
2314 * resource. Thus, the use of this flag results in conversion requests
2315 * being ordered on a "first come first servce" basis.
2316 *
2317 * DCT: This condition is all about new conversions being able to occur
2318 * "in place" while the lock remains on the granted queue (assuming
2319 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2320 * doesn't _have_ to go onto the convert queue where it's processed in
2321 * order. The "now" variable is necessary to distinguish converts
2322 * being received and processed for the first time now, because once a
2323 * convert is moved to the conversion queue the condition below applies
2324 * requiring fifo granting.
2325 */
2326
2327 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2328 return 1;
2329
2330 /*
2331 * Even if the convert is compat with all granted locks,
2332 * QUECVT forces it behind other locks on the convert queue.
2333 */
2334
2335 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2336 if (list_empty(&r->res_convertqueue))
2337 return 1;
2338 else
2339 return 0;
2340 }
2341
2342 /*
2343 * The NOORDER flag is set to avoid the standard vms rules on grant
2344 * order.
2345 */
2346
2347 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2348 return 1;
2349
2350 /*
2351 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2352 * granted until all other conversion requests ahead of it are granted
2353 * and/or canceled.
2354 */
2355
2356 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2357 return 1;
2358
2359 /*
2360 * 6-4: By default, a new request is immediately granted only if all
2361 * three of the following conditions are satisfied when the request is
2362 * issued:
2363 * - The queue of ungranted conversion requests for the resource is
2364 * empty.
2365 * - The queue of ungranted new requests for the resource is empty.
2366 * - The mode of the new request is compatible with the most
2367 * restrictive mode of all granted locks on the resource.
2368 */
2369
2370 if (now && !conv && list_empty(&r->res_convertqueue) &&
2371 list_empty(&r->res_waitqueue))
2372 return 1;
2373
2374 /*
2375 * 6-4: Once a lock request is in the queue of ungranted new requests,
2376 * it cannot be granted until the queue of ungranted conversion
2377 * requests is empty, all ungranted new requests ahead of it are
2378 * granted and/or canceled, and it is compatible with the granted mode
2379 * of the most restrictive lock granted on the resource.
2380 */
2381
2382 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2383 first_in_list(lkb, &r->res_waitqueue))
2384 return 1;
2385
2386 return 0;
2387 }
2388
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2389 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2390 int recover, int *err)
2391 {
2392 int rv;
2393 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2394 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2395
2396 if (err)
2397 *err = 0;
2398
2399 rv = _can_be_granted(r, lkb, now, recover);
2400 if (rv)
2401 goto out;
2402
2403 /*
2404 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2405 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2406 * cancels one of the locks.
2407 */
2408
2409 if (is_convert && can_be_queued(lkb) &&
2410 conversion_deadlock_detect(r, lkb)) {
2411 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2412 lkb->lkb_grmode = DLM_LOCK_NL;
2413 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2414 } else if (err) {
2415 *err = -EDEADLK;
2416 } else {
2417 log_print("can_be_granted deadlock %x now %d",
2418 lkb->lkb_id, now);
2419 dlm_dump_rsb(r);
2420 }
2421 goto out;
2422 }
2423
2424 /*
2425 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2426 * to grant a request in a mode other than the normal rqmode. It's a
2427 * simple way to provide a big optimization to applications that can
2428 * use them.
2429 */
2430
2431 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2432 alt = DLM_LOCK_PR;
2433 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2434 alt = DLM_LOCK_CW;
2435
2436 if (alt) {
2437 lkb->lkb_rqmode = alt;
2438 rv = _can_be_granted(r, lkb, now, 0);
2439 if (rv)
2440 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2441 else
2442 lkb->lkb_rqmode = rqmode;
2443 }
2444 out:
2445 return rv;
2446 }
2447
2448 /* Returns the highest requested mode of all blocked conversions; sets
2449 cw if there's a blocked conversion to DLM_LOCK_CW. */
2450
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2451 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2452 unsigned int *count)
2453 {
2454 struct dlm_lkb *lkb, *s;
2455 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2456 int hi, demoted, quit, grant_restart, demote_restart;
2457 int deadlk;
2458
2459 quit = 0;
2460 restart:
2461 grant_restart = 0;
2462 demote_restart = 0;
2463 hi = DLM_LOCK_IV;
2464
2465 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2466 demoted = is_demoted(lkb);
2467 deadlk = 0;
2468
2469 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2470 grant_lock_pending(r, lkb);
2471 grant_restart = 1;
2472 if (count)
2473 (*count)++;
2474 continue;
2475 }
2476
2477 if (!demoted && is_demoted(lkb)) {
2478 log_print("WARN: pending demoted %x node %d %s",
2479 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2480 demote_restart = 1;
2481 continue;
2482 }
2483
2484 if (deadlk) {
2485 /*
2486 * If DLM_LKB_NODLKWT flag is set and conversion
2487 * deadlock is detected, we request blocking AST and
2488 * down (or cancel) conversion.
2489 */
2490 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2491 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2492 queue_bast(r, lkb, lkb->lkb_rqmode);
2493 lkb->lkb_highbast = lkb->lkb_rqmode;
2494 }
2495 } else {
2496 log_print("WARN: pending deadlock %x node %d %s",
2497 lkb->lkb_id, lkb->lkb_nodeid,
2498 r->res_name);
2499 dlm_dump_rsb(r);
2500 }
2501 continue;
2502 }
2503
2504 hi = max_t(int, lkb->lkb_rqmode, hi);
2505
2506 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2507 *cw = 1;
2508 }
2509
2510 if (grant_restart)
2511 goto restart;
2512 if (demote_restart && !quit) {
2513 quit = 1;
2514 goto restart;
2515 }
2516
2517 return max_t(int, high, hi);
2518 }
2519
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2520 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2521 unsigned int *count)
2522 {
2523 struct dlm_lkb *lkb, *s;
2524
2525 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2526 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2527 grant_lock_pending(r, lkb);
2528 if (count)
2529 (*count)++;
2530 } else {
2531 high = max_t(int, lkb->lkb_rqmode, high);
2532 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2533 *cw = 1;
2534 }
2535 }
2536
2537 return high;
2538 }
2539
2540 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2541 on either the convert or waiting queue.
2542 high is the largest rqmode of all locks blocked on the convert or
2543 waiting queue. */
2544
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2545 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2546 {
2547 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2548 if (gr->lkb_highbast < DLM_LOCK_EX)
2549 return 1;
2550 return 0;
2551 }
2552
2553 if (gr->lkb_highbast < high &&
2554 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2555 return 1;
2556 return 0;
2557 }
2558
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2559 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2560 {
2561 struct dlm_lkb *lkb, *s;
2562 int high = DLM_LOCK_IV;
2563 int cw = 0;
2564
2565 if (!is_master(r)) {
2566 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2567 dlm_dump_rsb(r);
2568 return;
2569 }
2570
2571 high = grant_pending_convert(r, high, &cw, count);
2572 high = grant_pending_wait(r, high, &cw, count);
2573
2574 if (high == DLM_LOCK_IV)
2575 return;
2576
2577 /*
2578 * If there are locks left on the wait/convert queue then send blocking
2579 * ASTs to granted locks based on the largest requested mode (high)
2580 * found above.
2581 */
2582
2583 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2584 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2585 if (cw && high == DLM_LOCK_PR &&
2586 lkb->lkb_grmode == DLM_LOCK_PR)
2587 queue_bast(r, lkb, DLM_LOCK_CW);
2588 else
2589 queue_bast(r, lkb, high);
2590 lkb->lkb_highbast = high;
2591 }
2592 }
2593 }
2594
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2595 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2596 {
2597 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2598 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2599 if (gr->lkb_highbast < DLM_LOCK_EX)
2600 return 1;
2601 return 0;
2602 }
2603
2604 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2605 return 1;
2606 return 0;
2607 }
2608
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2609 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2610 struct dlm_lkb *lkb)
2611 {
2612 struct dlm_lkb *gr;
2613
2614 list_for_each_entry(gr, head, lkb_statequeue) {
2615 /* skip self when sending basts to convertqueue */
2616 if (gr == lkb)
2617 continue;
2618 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2619 queue_bast(r, gr, lkb->lkb_rqmode);
2620 gr->lkb_highbast = lkb->lkb_rqmode;
2621 }
2622 }
2623 }
2624
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2625 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2626 {
2627 send_bast_queue(r, &r->res_grantqueue, lkb);
2628 }
2629
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2630 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2631 {
2632 send_bast_queue(r, &r->res_grantqueue, lkb);
2633 send_bast_queue(r, &r->res_convertqueue, lkb);
2634 }
2635
2636 /* set_master(r, lkb) -- set the master nodeid of a resource
2637
2638 The purpose of this function is to set the nodeid field in the given
2639 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2640 known, it can just be copied to the lkb and the function will return
2641 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2642 before it can be copied to the lkb.
2643
2644 When the rsb nodeid is being looked up remotely, the initial lkb
2645 causing the lookup is kept on the ls_waiters list waiting for the
2646 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2647 on the rsb's res_lookup list until the master is verified.
2648
2649 Return values:
2650 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2651 1: the rsb master is not available and the lkb has been placed on
2652 a wait queue
2653 */
2654
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2655 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2656 {
2657 int our_nodeid = dlm_our_nodeid();
2658
2659 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2660 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2661 r->res_first_lkid = lkb->lkb_id;
2662 lkb->lkb_nodeid = r->res_nodeid;
2663 return 0;
2664 }
2665
2666 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2667 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2668 return 1;
2669 }
2670
2671 if (r->res_master_nodeid == our_nodeid) {
2672 lkb->lkb_nodeid = 0;
2673 return 0;
2674 }
2675
2676 if (r->res_master_nodeid) {
2677 lkb->lkb_nodeid = r->res_master_nodeid;
2678 return 0;
2679 }
2680
2681 if (dlm_dir_nodeid(r) == our_nodeid) {
2682 /* This is a somewhat unusual case; find_rsb will usually
2683 have set res_master_nodeid when dir nodeid is local, but
2684 there are cases where we become the dir node after we've
2685 past find_rsb and go through _request_lock again.
2686 confirm_master() or process_lookup_list() needs to be
2687 called after this. */
2688 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2689 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2690 r->res_name);
2691 r->res_master_nodeid = our_nodeid;
2692 r->res_nodeid = 0;
2693 lkb->lkb_nodeid = 0;
2694 return 0;
2695 }
2696
2697 r->res_first_lkid = lkb->lkb_id;
2698 send_lookup(r, lkb);
2699 return 1;
2700 }
2701
process_lookup_list(struct dlm_rsb * r)2702 static void process_lookup_list(struct dlm_rsb *r)
2703 {
2704 struct dlm_lkb *lkb, *safe;
2705
2706 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2707 list_del_init(&lkb->lkb_rsb_lookup);
2708 _request_lock(r, lkb);
2709 }
2710 }
2711
2712 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2713
confirm_master(struct dlm_rsb * r,int error)2714 static void confirm_master(struct dlm_rsb *r, int error)
2715 {
2716 struct dlm_lkb *lkb;
2717
2718 if (!r->res_first_lkid)
2719 return;
2720
2721 switch (error) {
2722 case 0:
2723 case -EINPROGRESS:
2724 r->res_first_lkid = 0;
2725 process_lookup_list(r);
2726 break;
2727
2728 case -EAGAIN:
2729 case -EBADR:
2730 case -ENOTBLK:
2731 /* the remote request failed and won't be retried (it was
2732 a NOQUEUE, or has been canceled/unlocked); make a waiting
2733 lkb the first_lkid */
2734
2735 r->res_first_lkid = 0;
2736
2737 if (!list_empty(&r->res_lookup)) {
2738 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2739 lkb_rsb_lookup);
2740 list_del_init(&lkb->lkb_rsb_lookup);
2741 r->res_first_lkid = lkb->lkb_id;
2742 _request_lock(r, lkb);
2743 }
2744 break;
2745
2746 default:
2747 log_error(r->res_ls, "confirm_master unknown error %d", error);
2748 }
2749 }
2750
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2751 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2752 int namelen, void (*ast)(void *astparam),
2753 void *astparam,
2754 void (*bast)(void *astparam, int mode),
2755 struct dlm_args *args)
2756 {
2757 int rv = -EINVAL;
2758
2759 /* check for invalid arg usage */
2760
2761 if (mode < 0 || mode > DLM_LOCK_EX)
2762 goto out;
2763
2764 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2765 goto out;
2766
2767 if (flags & DLM_LKF_CANCEL)
2768 goto out;
2769
2770 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2771 goto out;
2772
2773 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2774 goto out;
2775
2776 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2777 goto out;
2778
2779 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2780 goto out;
2781
2782 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2783 goto out;
2784
2785 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2786 goto out;
2787
2788 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2789 goto out;
2790
2791 if (!ast || !lksb)
2792 goto out;
2793
2794 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2795 goto out;
2796
2797 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2798 goto out;
2799
2800 /* these args will be copied to the lkb in validate_lock_args,
2801 it cannot be done now because when converting locks, fields in
2802 an active lkb cannot be modified before locking the rsb */
2803
2804 args->flags = flags;
2805 args->astfn = ast;
2806 args->astparam = astparam;
2807 args->bastfn = bast;
2808 args->mode = mode;
2809 args->lksb = lksb;
2810 rv = 0;
2811 out:
2812 return rv;
2813 }
2814
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2815 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2816 {
2817 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2818 DLM_LKF_FORCEUNLOCK))
2819 return -EINVAL;
2820
2821 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2822 return -EINVAL;
2823
2824 args->flags = flags;
2825 args->astparam = astarg;
2826 return 0;
2827 }
2828
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2829 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2830 struct dlm_args *args)
2831 {
2832 int rv = -EBUSY;
2833
2834 if (args->flags & DLM_LKF_CONVERT) {
2835 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2836 goto out;
2837
2838 /* lock not allowed if there's any op in progress */
2839 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2840 goto out;
2841
2842 if (is_overlap(lkb))
2843 goto out;
2844
2845 rv = -EINVAL;
2846 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2847 goto out;
2848
2849 if (args->flags & DLM_LKF_QUECVT &&
2850 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2851 goto out;
2852 }
2853
2854 lkb->lkb_exflags = args->flags;
2855 dlm_set_sbflags_val(lkb, 0);
2856 lkb->lkb_astfn = args->astfn;
2857 lkb->lkb_astparam = args->astparam;
2858 lkb->lkb_bastfn = args->bastfn;
2859 lkb->lkb_rqmode = args->mode;
2860 lkb->lkb_lksb = args->lksb;
2861 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2862 lkb->lkb_ownpid = (int) current->pid;
2863 rv = 0;
2864 out:
2865 switch (rv) {
2866 case 0:
2867 break;
2868 case -EINVAL:
2869 /* annoy the user because dlm usage is wrong */
2870 WARN_ON(1);
2871 log_error(ls, "%s %d %x %x %x %d %d", __func__,
2872 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2873 lkb->lkb_status, lkb->lkb_wait_type);
2874 break;
2875 default:
2876 log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2877 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2878 lkb->lkb_status, lkb->lkb_wait_type);
2879 break;
2880 }
2881
2882 return rv;
2883 }
2884
2885 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2886 for success */
2887
2888 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2889 because there may be a lookup in progress and it's valid to do
2890 cancel/unlockf on it */
2891
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2892 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2893 {
2894 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2895 int rv = -EBUSY;
2896
2897 /* normal unlock not allowed if there's any op in progress */
2898 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2899 (lkb->lkb_wait_type || lkb->lkb_wait_count))
2900 goto out;
2901
2902 /* an lkb may be waiting for an rsb lookup to complete where the
2903 lookup was initiated by another lock */
2904
2905 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2906 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2907 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2908 list_del_init(&lkb->lkb_rsb_lookup);
2909 queue_cast(lkb->lkb_resource, lkb,
2910 args->flags & DLM_LKF_CANCEL ?
2911 -DLM_ECANCEL : -DLM_EUNLOCK);
2912 unhold_lkb(lkb); /* undoes create_lkb() */
2913 }
2914 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2915 goto out;
2916 }
2917
2918 rv = -EINVAL;
2919 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2920 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2921 dlm_print_lkb(lkb);
2922 goto out;
2923 }
2924
2925 /* an lkb may still exist even though the lock is EOL'ed due to a
2926 * cancel, unlock or failed noqueue request; an app can't use these
2927 * locks; return same error as if the lkid had not been found at all
2928 */
2929
2930 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2931 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2932 rv = -ENOENT;
2933 goto out;
2934 }
2935
2936 if (is_overlap_unlock(lkb))
2937 goto out;
2938
2939 /* cancel not allowed with another cancel/unlock in progress */
2940
2941 if (args->flags & DLM_LKF_CANCEL) {
2942 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2943 goto out;
2944
2945 if (is_overlap_cancel(lkb))
2946 goto out;
2947
2948 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2949 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2950 rv = -EBUSY;
2951 goto out;
2952 }
2953
2954 /* there's nothing to cancel */
2955 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2956 !lkb->lkb_wait_type) {
2957 rv = -EBUSY;
2958 goto out;
2959 }
2960
2961 switch (lkb->lkb_wait_type) {
2962 case DLM_MSG_LOOKUP:
2963 case DLM_MSG_REQUEST:
2964 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2965 rv = -EBUSY;
2966 goto out;
2967 case DLM_MSG_UNLOCK:
2968 case DLM_MSG_CANCEL:
2969 goto out;
2970 }
2971 /* add_to_waiters() will set OVERLAP_CANCEL */
2972 goto out_ok;
2973 }
2974
2975 /* do we need to allow a force-unlock if there's a normal unlock
2976 already in progress? in what conditions could the normal unlock
2977 fail such that we'd want to send a force-unlock to be sure? */
2978
2979 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2980 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2981 goto out;
2982
2983 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2984 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2985 rv = -EBUSY;
2986 goto out;
2987 }
2988
2989 switch (lkb->lkb_wait_type) {
2990 case DLM_MSG_LOOKUP:
2991 case DLM_MSG_REQUEST:
2992 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2993 rv = -EBUSY;
2994 goto out;
2995 case DLM_MSG_UNLOCK:
2996 goto out;
2997 }
2998 /* add_to_waiters() will set OVERLAP_UNLOCK */
2999 }
3000
3001 out_ok:
3002 /* an overlapping op shouldn't blow away exflags from other op */
3003 lkb->lkb_exflags |= args->flags;
3004 dlm_set_sbflags_val(lkb, 0);
3005 lkb->lkb_astparam = args->astparam;
3006 rv = 0;
3007 out:
3008 switch (rv) {
3009 case 0:
3010 break;
3011 case -EINVAL:
3012 /* annoy the user because dlm usage is wrong */
3013 WARN_ON(1);
3014 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3015 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3016 args->flags, lkb->lkb_wait_type,
3017 lkb->lkb_resource->res_name);
3018 break;
3019 default:
3020 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3021 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3022 args->flags, lkb->lkb_wait_type,
3023 lkb->lkb_resource->res_name);
3024 break;
3025 }
3026
3027 return rv;
3028 }
3029
3030 /*
3031 * Four stage 4 varieties:
3032 * do_request(), do_convert(), do_unlock(), do_cancel()
3033 * These are called on the master node for the given lock and
3034 * from the central locking logic.
3035 */
3036
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3037 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3038 {
3039 int error = 0;
3040
3041 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3042 grant_lock(r, lkb);
3043 queue_cast(r, lkb, 0);
3044 goto out;
3045 }
3046
3047 if (can_be_queued(lkb)) {
3048 error = -EINPROGRESS;
3049 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3050 goto out;
3051 }
3052
3053 error = -EAGAIN;
3054 queue_cast(r, lkb, -EAGAIN);
3055 out:
3056 return error;
3057 }
3058
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3059 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3060 int error)
3061 {
3062 switch (error) {
3063 case -EAGAIN:
3064 if (force_blocking_asts(lkb))
3065 send_blocking_asts_all(r, lkb);
3066 break;
3067 case -EINPROGRESS:
3068 send_blocking_asts(r, lkb);
3069 break;
3070 }
3071 }
3072
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3073 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3074 {
3075 int error = 0;
3076 int deadlk = 0;
3077
3078 /* changing an existing lock may allow others to be granted */
3079
3080 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3081 grant_lock(r, lkb);
3082 queue_cast(r, lkb, 0);
3083 goto out;
3084 }
3085
3086 /* can_be_granted() detected that this lock would block in a conversion
3087 deadlock, so we leave it on the granted queue and return EDEADLK in
3088 the ast for the convert. */
3089
3090 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3091 /* it's left on the granted queue */
3092 revert_lock(r, lkb);
3093 queue_cast(r, lkb, -EDEADLK);
3094 error = -EDEADLK;
3095 goto out;
3096 }
3097
3098 /* is_demoted() means the can_be_granted() above set the grmode
3099 to NL, and left us on the granted queue. This auto-demotion
3100 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3101 now grantable. We have to try to grant other converting locks
3102 before we try again to grant this one. */
3103
3104 if (is_demoted(lkb)) {
3105 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3106 if (_can_be_granted(r, lkb, 1, 0)) {
3107 grant_lock(r, lkb);
3108 queue_cast(r, lkb, 0);
3109 goto out;
3110 }
3111 /* else fall through and move to convert queue */
3112 }
3113
3114 if (can_be_queued(lkb)) {
3115 error = -EINPROGRESS;
3116 del_lkb(r, lkb);
3117 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3118 goto out;
3119 }
3120
3121 error = -EAGAIN;
3122 queue_cast(r, lkb, -EAGAIN);
3123 out:
3124 return error;
3125 }
3126
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3127 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3128 int error)
3129 {
3130 switch (error) {
3131 case 0:
3132 grant_pending_locks(r, NULL);
3133 /* grant_pending_locks also sends basts */
3134 break;
3135 case -EAGAIN:
3136 if (force_blocking_asts(lkb))
3137 send_blocking_asts_all(r, lkb);
3138 break;
3139 case -EINPROGRESS:
3140 send_blocking_asts(r, lkb);
3141 break;
3142 }
3143 }
3144
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3145 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3146 {
3147 remove_lock(r, lkb);
3148 queue_cast(r, lkb, -DLM_EUNLOCK);
3149 return -DLM_EUNLOCK;
3150 }
3151
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3152 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3153 int error)
3154 {
3155 grant_pending_locks(r, NULL);
3156 }
3157
3158 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3159
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3160 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3161 {
3162 int error;
3163
3164 error = revert_lock(r, lkb);
3165 if (error) {
3166 queue_cast(r, lkb, -DLM_ECANCEL);
3167 return -DLM_ECANCEL;
3168 }
3169 return 0;
3170 }
3171
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3172 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3173 int error)
3174 {
3175 if (error)
3176 grant_pending_locks(r, NULL);
3177 }
3178
3179 /*
3180 * Four stage 3 varieties:
3181 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3182 */
3183
3184 /* add a new lkb to a possibly new rsb, called by requesting process */
3185
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3186 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3187 {
3188 int error;
3189
3190 /* set_master: sets lkb nodeid from r */
3191
3192 error = set_master(r, lkb);
3193 if (error < 0)
3194 goto out;
3195 if (error) {
3196 error = 0;
3197 goto out;
3198 }
3199
3200 if (is_remote(r)) {
3201 /* receive_request() calls do_request() on remote node */
3202 error = send_request(r, lkb);
3203 } else {
3204 error = do_request(r, lkb);
3205 /* for remote locks the request_reply is sent
3206 between do_request and do_request_effects */
3207 do_request_effects(r, lkb, error);
3208 }
3209 out:
3210 return error;
3211 }
3212
3213 /* change some property of an existing lkb, e.g. mode */
3214
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3215 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3216 {
3217 int error;
3218
3219 if (is_remote(r)) {
3220 /* receive_convert() calls do_convert() on remote node */
3221 error = send_convert(r, lkb);
3222 } else {
3223 error = do_convert(r, lkb);
3224 /* for remote locks the convert_reply is sent
3225 between do_convert and do_convert_effects */
3226 do_convert_effects(r, lkb, error);
3227 }
3228
3229 return error;
3230 }
3231
3232 /* remove an existing lkb from the granted queue */
3233
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3234 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3235 {
3236 int error;
3237
3238 if (is_remote(r)) {
3239 /* receive_unlock() calls do_unlock() on remote node */
3240 error = send_unlock(r, lkb);
3241 } else {
3242 error = do_unlock(r, lkb);
3243 /* for remote locks the unlock_reply is sent
3244 between do_unlock and do_unlock_effects */
3245 do_unlock_effects(r, lkb, error);
3246 }
3247
3248 return error;
3249 }
3250
3251 /* remove an existing lkb from the convert or wait queue */
3252
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3253 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3254 {
3255 int error;
3256
3257 if (is_remote(r)) {
3258 /* receive_cancel() calls do_cancel() on remote node */
3259 error = send_cancel(r, lkb);
3260 } else {
3261 error = do_cancel(r, lkb);
3262 /* for remote locks the cancel_reply is sent
3263 between do_cancel and do_cancel_effects */
3264 do_cancel_effects(r, lkb, error);
3265 }
3266
3267 return error;
3268 }
3269
3270 /*
3271 * Four stage 2 varieties:
3272 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3273 */
3274
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3275 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3276 const void *name, int len,
3277 struct dlm_args *args)
3278 {
3279 struct dlm_rsb *r;
3280 int error;
3281
3282 error = validate_lock_args(ls, lkb, args);
3283 if (error)
3284 return error;
3285
3286 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3287 if (error)
3288 return error;
3289
3290 lock_rsb(r);
3291
3292 attach_lkb(r, lkb);
3293 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3294
3295 error = _request_lock(r, lkb);
3296
3297 unlock_rsb(r);
3298 put_rsb(r);
3299 return error;
3300 }
3301
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3302 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3303 struct dlm_args *args)
3304 {
3305 struct dlm_rsb *r;
3306 int error;
3307
3308 r = lkb->lkb_resource;
3309
3310 hold_rsb(r);
3311 lock_rsb(r);
3312
3313 error = validate_lock_args(ls, lkb, args);
3314 if (error)
3315 goto out;
3316
3317 error = _convert_lock(r, lkb);
3318 out:
3319 unlock_rsb(r);
3320 put_rsb(r);
3321 return error;
3322 }
3323
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3324 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3325 struct dlm_args *args)
3326 {
3327 struct dlm_rsb *r;
3328 int error;
3329
3330 r = lkb->lkb_resource;
3331
3332 hold_rsb(r);
3333 lock_rsb(r);
3334
3335 error = validate_unlock_args(lkb, args);
3336 if (error)
3337 goto out;
3338
3339 error = _unlock_lock(r, lkb);
3340 out:
3341 unlock_rsb(r);
3342 put_rsb(r);
3343 return error;
3344 }
3345
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3346 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3347 struct dlm_args *args)
3348 {
3349 struct dlm_rsb *r;
3350 int error;
3351
3352 r = lkb->lkb_resource;
3353
3354 hold_rsb(r);
3355 lock_rsb(r);
3356
3357 error = validate_unlock_args(lkb, args);
3358 if (error)
3359 goto out;
3360
3361 error = _cancel_lock(r, lkb);
3362 out:
3363 unlock_rsb(r);
3364 put_rsb(r);
3365 return error;
3366 }
3367
3368 /*
3369 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3370 */
3371
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3372 int dlm_lock(dlm_lockspace_t *lockspace,
3373 int mode,
3374 struct dlm_lksb *lksb,
3375 uint32_t flags,
3376 const void *name,
3377 unsigned int namelen,
3378 uint32_t parent_lkid,
3379 void (*ast) (void *astarg),
3380 void *astarg,
3381 void (*bast) (void *astarg, int mode))
3382 {
3383 struct dlm_ls *ls;
3384 struct dlm_lkb *lkb;
3385 struct dlm_args args;
3386 int error, convert = flags & DLM_LKF_CONVERT;
3387
3388 ls = dlm_find_lockspace_local(lockspace);
3389 if (!ls)
3390 return -EINVAL;
3391
3392 dlm_lock_recovery(ls);
3393
3394 if (convert)
3395 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3396 else
3397 error = create_lkb(ls, &lkb);
3398
3399 if (error)
3400 goto out;
3401
3402 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3403
3404 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3405 &args);
3406 if (error)
3407 goto out_put;
3408
3409 if (convert)
3410 error = convert_lock(ls, lkb, &args);
3411 else
3412 error = request_lock(ls, lkb, name, namelen, &args);
3413
3414 if (error == -EINPROGRESS)
3415 error = 0;
3416 out_put:
3417 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3418
3419 if (convert || error)
3420 __put_lkb(ls, lkb);
3421 if (error == -EAGAIN || error == -EDEADLK)
3422 error = 0;
3423 out:
3424 dlm_unlock_recovery(ls);
3425 dlm_put_lockspace(ls);
3426 return error;
3427 }
3428
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3429 int dlm_unlock(dlm_lockspace_t *lockspace,
3430 uint32_t lkid,
3431 uint32_t flags,
3432 struct dlm_lksb *lksb,
3433 void *astarg)
3434 {
3435 struct dlm_ls *ls;
3436 struct dlm_lkb *lkb;
3437 struct dlm_args args;
3438 int error;
3439
3440 ls = dlm_find_lockspace_local(lockspace);
3441 if (!ls)
3442 return -EINVAL;
3443
3444 dlm_lock_recovery(ls);
3445
3446 error = find_lkb(ls, lkid, &lkb);
3447 if (error)
3448 goto out;
3449
3450 trace_dlm_unlock_start(ls, lkb, flags);
3451
3452 error = set_unlock_args(flags, astarg, &args);
3453 if (error)
3454 goto out_put;
3455
3456 if (flags & DLM_LKF_CANCEL)
3457 error = cancel_lock(ls, lkb, &args);
3458 else
3459 error = unlock_lock(ls, lkb, &args);
3460
3461 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3462 error = 0;
3463 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3464 error = 0;
3465 out_put:
3466 trace_dlm_unlock_end(ls, lkb, flags, error);
3467
3468 dlm_put_lkb(lkb);
3469 out:
3470 dlm_unlock_recovery(ls);
3471 dlm_put_lockspace(ls);
3472 return error;
3473 }
3474
3475 /*
3476 * send/receive routines for remote operations and replies
3477 *
3478 * send_args
3479 * send_common
3480 * send_request receive_request
3481 * send_convert receive_convert
3482 * send_unlock receive_unlock
3483 * send_cancel receive_cancel
3484 * send_grant receive_grant
3485 * send_bast receive_bast
3486 * send_lookup receive_lookup
3487 * send_remove receive_remove
3488 *
3489 * send_common_reply
3490 * receive_request_reply send_request_reply
3491 * receive_convert_reply send_convert_reply
3492 * receive_unlock_reply send_unlock_reply
3493 * receive_cancel_reply send_cancel_reply
3494 * receive_lookup_reply send_lookup_reply
3495 */
3496
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3497 static int _create_message(struct dlm_ls *ls, int mb_len,
3498 int to_nodeid, int mstype,
3499 struct dlm_message **ms_ret,
3500 struct dlm_mhandle **mh_ret)
3501 {
3502 struct dlm_message *ms;
3503 struct dlm_mhandle *mh;
3504 char *mb;
3505
3506 /* get_buffer gives us a message handle (mh) that we need to
3507 pass into midcomms_commit and a message buffer (mb) that we
3508 write our data into */
3509
3510 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3511 if (!mh)
3512 return -ENOBUFS;
3513
3514 ms = (struct dlm_message *) mb;
3515
3516 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3517 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3518 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3519 ms->m_header.h_length = cpu_to_le16(mb_len);
3520 ms->m_header.h_cmd = DLM_MSG;
3521
3522 ms->m_type = cpu_to_le32(mstype);
3523
3524 *mh_ret = mh;
3525 *ms_ret = ms;
3526 return 0;
3527 }
3528
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3529 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3530 int to_nodeid, int mstype,
3531 struct dlm_message **ms_ret,
3532 struct dlm_mhandle **mh_ret)
3533 {
3534 int mb_len = sizeof(struct dlm_message);
3535
3536 switch (mstype) {
3537 case DLM_MSG_REQUEST:
3538 case DLM_MSG_LOOKUP:
3539 case DLM_MSG_REMOVE:
3540 mb_len += r->res_length;
3541 break;
3542 case DLM_MSG_CONVERT:
3543 case DLM_MSG_UNLOCK:
3544 case DLM_MSG_REQUEST_REPLY:
3545 case DLM_MSG_CONVERT_REPLY:
3546 case DLM_MSG_GRANT:
3547 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3548 mb_len += r->res_ls->ls_lvblen;
3549 break;
3550 }
3551
3552 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3553 ms_ret, mh_ret);
3554 }
3555
3556 /* further lowcomms enhancements or alternate implementations may make
3557 the return value from this function useful at some point */
3558
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3559 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3560 const void *name, int namelen)
3561 {
3562 dlm_midcomms_commit_mhandle(mh, name, namelen);
3563 return 0;
3564 }
3565
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3566 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3567 struct dlm_message *ms)
3568 {
3569 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3570 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3571 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3572 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3573 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3574 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3575 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3576 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3577 ms->m_status = cpu_to_le32(lkb->lkb_status);
3578 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3579 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3580 ms->m_hash = cpu_to_le32(r->res_hash);
3581
3582 /* m_result and m_bastmode are set from function args,
3583 not from lkb fields */
3584
3585 if (lkb->lkb_bastfn)
3586 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3587 if (lkb->lkb_astfn)
3588 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3589
3590 /* compare with switch in create_message; send_remove() doesn't
3591 use send_args() */
3592
3593 switch (ms->m_type) {
3594 case cpu_to_le32(DLM_MSG_REQUEST):
3595 case cpu_to_le32(DLM_MSG_LOOKUP):
3596 memcpy(ms->m_extra, r->res_name, r->res_length);
3597 break;
3598 case cpu_to_le32(DLM_MSG_CONVERT):
3599 case cpu_to_le32(DLM_MSG_UNLOCK):
3600 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3601 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3602 case cpu_to_le32(DLM_MSG_GRANT):
3603 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3604 break;
3605 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3606 break;
3607 }
3608 }
3609
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3610 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3611 {
3612 struct dlm_message *ms;
3613 struct dlm_mhandle *mh;
3614 int to_nodeid, error;
3615
3616 to_nodeid = r->res_nodeid;
3617
3618 add_to_waiters(lkb, mstype, to_nodeid);
3619 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3620 if (error)
3621 goto fail;
3622
3623 send_args(r, lkb, ms);
3624
3625 error = send_message(mh, ms, r->res_name, r->res_length);
3626 if (error)
3627 goto fail;
3628 return 0;
3629
3630 fail:
3631 remove_from_waiters(lkb, msg_reply_type(mstype));
3632 return error;
3633 }
3634
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3635 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3636 {
3637 return send_common(r, lkb, DLM_MSG_REQUEST);
3638 }
3639
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3640 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3641 {
3642 int error;
3643
3644 error = send_common(r, lkb, DLM_MSG_CONVERT);
3645
3646 /* down conversions go without a reply from the master */
3647 if (!error && down_conversion(lkb)) {
3648 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3649 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3650 r->res_ls->ls_local_ms.m_result = 0;
3651 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3652 }
3653
3654 return error;
3655 }
3656
3657 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3658 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3659 that the master is still correct. */
3660
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3661 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3662 {
3663 return send_common(r, lkb, DLM_MSG_UNLOCK);
3664 }
3665
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3666 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3667 {
3668 return send_common(r, lkb, DLM_MSG_CANCEL);
3669 }
3670
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3671 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3672 {
3673 struct dlm_message *ms;
3674 struct dlm_mhandle *mh;
3675 int to_nodeid, error;
3676
3677 to_nodeid = lkb->lkb_nodeid;
3678
3679 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3680 if (error)
3681 goto out;
3682
3683 send_args(r, lkb, ms);
3684
3685 ms->m_result = 0;
3686
3687 error = send_message(mh, ms, r->res_name, r->res_length);
3688 out:
3689 return error;
3690 }
3691
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3692 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3693 {
3694 struct dlm_message *ms;
3695 struct dlm_mhandle *mh;
3696 int to_nodeid, error;
3697
3698 to_nodeid = lkb->lkb_nodeid;
3699
3700 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3701 if (error)
3702 goto out;
3703
3704 send_args(r, lkb, ms);
3705
3706 ms->m_bastmode = cpu_to_le32(mode);
3707
3708 error = send_message(mh, ms, r->res_name, r->res_length);
3709 out:
3710 return error;
3711 }
3712
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3713 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3714 {
3715 struct dlm_message *ms;
3716 struct dlm_mhandle *mh;
3717 int to_nodeid, error;
3718
3719 to_nodeid = dlm_dir_nodeid(r);
3720
3721 add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3722 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3723 if (error)
3724 goto fail;
3725
3726 send_args(r, lkb, ms);
3727
3728 error = send_message(mh, ms, r->res_name, r->res_length);
3729 if (error)
3730 goto fail;
3731 return 0;
3732
3733 fail:
3734 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3735 return error;
3736 }
3737
send_remove(struct dlm_rsb * r)3738 static int send_remove(struct dlm_rsb *r)
3739 {
3740 struct dlm_message *ms;
3741 struct dlm_mhandle *mh;
3742 int to_nodeid, error;
3743
3744 to_nodeid = dlm_dir_nodeid(r);
3745
3746 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3747 if (error)
3748 goto out;
3749
3750 memcpy(ms->m_extra, r->res_name, r->res_length);
3751 ms->m_hash = cpu_to_le32(r->res_hash);
3752
3753 error = send_message(mh, ms, r->res_name, r->res_length);
3754 out:
3755 return error;
3756 }
3757
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3758 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3759 int mstype, int rv)
3760 {
3761 struct dlm_message *ms;
3762 struct dlm_mhandle *mh;
3763 int to_nodeid, error;
3764
3765 to_nodeid = lkb->lkb_nodeid;
3766
3767 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3768 if (error)
3769 goto out;
3770
3771 send_args(r, lkb, ms);
3772
3773 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3774
3775 error = send_message(mh, ms, r->res_name, r->res_length);
3776 out:
3777 return error;
3778 }
3779
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3780 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3781 {
3782 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3783 }
3784
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3785 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3786 {
3787 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3788 }
3789
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3790 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3791 {
3792 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3793 }
3794
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3795 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3796 {
3797 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3798 }
3799
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3800 static int send_lookup_reply(struct dlm_ls *ls,
3801 const struct dlm_message *ms_in, int ret_nodeid,
3802 int rv)
3803 {
3804 struct dlm_rsb *r = &ls->ls_local_rsb;
3805 struct dlm_message *ms;
3806 struct dlm_mhandle *mh;
3807 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3808
3809 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3810 if (error)
3811 goto out;
3812
3813 ms->m_lkid = ms_in->m_lkid;
3814 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3815 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3816
3817 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3818 out:
3819 return error;
3820 }
3821
3822 /* which args we save from a received message depends heavily on the type
3823 of message, unlike the send side where we can safely send everything about
3824 the lkb for any type of message */
3825
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3826 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3827 {
3828 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3829 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3830 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3831 }
3832
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3833 static void receive_flags_reply(struct dlm_lkb *lkb,
3834 const struct dlm_message *ms,
3835 bool local)
3836 {
3837 if (local)
3838 return;
3839
3840 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3841 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3842 }
3843
receive_extralen(const struct dlm_message * ms)3844 static int receive_extralen(const struct dlm_message *ms)
3845 {
3846 return (le16_to_cpu(ms->m_header.h_length) -
3847 sizeof(struct dlm_message));
3848 }
3849
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3850 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3851 const struct dlm_message *ms)
3852 {
3853 int len;
3854
3855 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3856 if (!lkb->lkb_lvbptr)
3857 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3858 if (!lkb->lkb_lvbptr)
3859 return -ENOMEM;
3860 len = receive_extralen(ms);
3861 if (len > ls->ls_lvblen)
3862 len = ls->ls_lvblen;
3863 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3864 }
3865 return 0;
3866 }
3867
fake_bastfn(void * astparam,int mode)3868 static void fake_bastfn(void *astparam, int mode)
3869 {
3870 log_print("fake_bastfn should not be called");
3871 }
3872
fake_astfn(void * astparam)3873 static void fake_astfn(void *astparam)
3874 {
3875 log_print("fake_astfn should not be called");
3876 }
3877
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3878 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3879 const struct dlm_message *ms)
3880 {
3881 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3882 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3883 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3884 lkb->lkb_grmode = DLM_LOCK_IV;
3885 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3886
3887 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3888 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3889
3890 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3891 /* lkb was just created so there won't be an lvb yet */
3892 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3893 if (!lkb->lkb_lvbptr)
3894 return -ENOMEM;
3895 }
3896
3897 return 0;
3898 }
3899
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3900 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3901 const struct dlm_message *ms)
3902 {
3903 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3904 return -EBUSY;
3905
3906 if (receive_lvb(ls, lkb, ms))
3907 return -ENOMEM;
3908
3909 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3910 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3911
3912 return 0;
3913 }
3914
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3915 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3916 const struct dlm_message *ms)
3917 {
3918 if (receive_lvb(ls, lkb, ms))
3919 return -ENOMEM;
3920 return 0;
3921 }
3922
3923 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3924 uses to send a reply and that the remote end uses to process the reply. */
3925
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3926 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3927 {
3928 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3929 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3930 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3931 }
3932
3933 /* This is called after the rsb is locked so that we can safely inspect
3934 fields in the lkb. */
3935
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3936 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3937 {
3938 int from = le32_to_cpu(ms->m_header.h_nodeid);
3939 int error = 0;
3940
3941 /* currently mixing of user/kernel locks are not supported */
3942 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3943 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3944 log_error(lkb->lkb_resource->res_ls,
3945 "got user dlm message for a kernel lock");
3946 error = -EINVAL;
3947 goto out;
3948 }
3949
3950 switch (ms->m_type) {
3951 case cpu_to_le32(DLM_MSG_CONVERT):
3952 case cpu_to_le32(DLM_MSG_UNLOCK):
3953 case cpu_to_le32(DLM_MSG_CANCEL):
3954 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3955 error = -EINVAL;
3956 break;
3957
3958 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3959 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3960 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3961 case cpu_to_le32(DLM_MSG_GRANT):
3962 case cpu_to_le32(DLM_MSG_BAST):
3963 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3964 error = -EINVAL;
3965 break;
3966
3967 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3968 if (!is_process_copy(lkb))
3969 error = -EINVAL;
3970 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3971 error = -EINVAL;
3972 break;
3973
3974 default:
3975 error = -EINVAL;
3976 }
3977
3978 out:
3979 if (error)
3980 log_error(lkb->lkb_resource->res_ls,
3981 "ignore invalid message %d from %d %x %x %x %d",
3982 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3983 lkb->lkb_remid, dlm_iflags_val(lkb),
3984 lkb->lkb_nodeid);
3985 return error;
3986 }
3987
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)3988 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3989 {
3990 struct dlm_lkb *lkb;
3991 struct dlm_rsb *r;
3992 int from_nodeid;
3993 int error, namelen = 0;
3994
3995 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3996
3997 error = create_lkb(ls, &lkb);
3998 if (error)
3999 goto fail;
4000
4001 receive_flags(lkb, ms);
4002 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4003 error = receive_request_args(ls, lkb, ms);
4004 if (error) {
4005 __put_lkb(ls, lkb);
4006 goto fail;
4007 }
4008
4009 /* The dir node is the authority on whether we are the master
4010 for this rsb or not, so if the master sends us a request, we should
4011 recreate the rsb if we've destroyed it. This race happens when we
4012 send a remove message to the dir node at the same time that the dir
4013 node sends us a request for the rsb. */
4014
4015 namelen = receive_extralen(ms);
4016
4017 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4018 R_RECEIVE_REQUEST, &r);
4019 if (error) {
4020 __put_lkb(ls, lkb);
4021 goto fail;
4022 }
4023
4024 lock_rsb(r);
4025
4026 if (r->res_master_nodeid != dlm_our_nodeid()) {
4027 error = validate_master_nodeid(ls, r, from_nodeid);
4028 if (error) {
4029 unlock_rsb(r);
4030 put_rsb(r);
4031 __put_lkb(ls, lkb);
4032 goto fail;
4033 }
4034 }
4035
4036 attach_lkb(r, lkb);
4037 error = do_request(r, lkb);
4038 send_request_reply(r, lkb, error);
4039 do_request_effects(r, lkb, error);
4040
4041 unlock_rsb(r);
4042 put_rsb(r);
4043
4044 if (error == -EINPROGRESS)
4045 error = 0;
4046 if (error)
4047 dlm_put_lkb(lkb);
4048 return 0;
4049
4050 fail:
4051 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4052 and do this receive_request again from process_lookup_list once
4053 we get the lookup reply. This would avoid a many repeated
4054 ENOTBLK request failures when the lookup reply designating us
4055 as master is delayed. */
4056
4057 if (error != -ENOTBLK) {
4058 log_limit(ls, "receive_request %x from %d %d",
4059 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4060 }
4061
4062 setup_local_lkb(ls, ms);
4063 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4064 return error;
4065 }
4066
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4067 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4068 {
4069 struct dlm_lkb *lkb;
4070 struct dlm_rsb *r;
4071 int error, reply = 1;
4072
4073 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4074 if (error)
4075 goto fail;
4076
4077 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4078 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4079 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4080 (unsigned long long)lkb->lkb_recover_seq,
4081 le32_to_cpu(ms->m_header.h_nodeid),
4082 le32_to_cpu(ms->m_lkid));
4083 error = -ENOENT;
4084 dlm_put_lkb(lkb);
4085 goto fail;
4086 }
4087
4088 r = lkb->lkb_resource;
4089
4090 hold_rsb(r);
4091 lock_rsb(r);
4092
4093 error = validate_message(lkb, ms);
4094 if (error)
4095 goto out;
4096
4097 receive_flags(lkb, ms);
4098
4099 error = receive_convert_args(ls, lkb, ms);
4100 if (error) {
4101 send_convert_reply(r, lkb, error);
4102 goto out;
4103 }
4104
4105 reply = !down_conversion(lkb);
4106
4107 error = do_convert(r, lkb);
4108 if (reply)
4109 send_convert_reply(r, lkb, error);
4110 do_convert_effects(r, lkb, error);
4111 out:
4112 unlock_rsb(r);
4113 put_rsb(r);
4114 dlm_put_lkb(lkb);
4115 return 0;
4116
4117 fail:
4118 setup_local_lkb(ls, ms);
4119 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4120 return error;
4121 }
4122
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4123 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4124 {
4125 struct dlm_lkb *lkb;
4126 struct dlm_rsb *r;
4127 int error;
4128
4129 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4130 if (error)
4131 goto fail;
4132
4133 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4134 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4135 lkb->lkb_id, lkb->lkb_remid,
4136 le32_to_cpu(ms->m_header.h_nodeid),
4137 le32_to_cpu(ms->m_lkid));
4138 error = -ENOENT;
4139 dlm_put_lkb(lkb);
4140 goto fail;
4141 }
4142
4143 r = lkb->lkb_resource;
4144
4145 hold_rsb(r);
4146 lock_rsb(r);
4147
4148 error = validate_message(lkb, ms);
4149 if (error)
4150 goto out;
4151
4152 receive_flags(lkb, ms);
4153
4154 error = receive_unlock_args(ls, lkb, ms);
4155 if (error) {
4156 send_unlock_reply(r, lkb, error);
4157 goto out;
4158 }
4159
4160 error = do_unlock(r, lkb);
4161 send_unlock_reply(r, lkb, error);
4162 do_unlock_effects(r, lkb, error);
4163 out:
4164 unlock_rsb(r);
4165 put_rsb(r);
4166 dlm_put_lkb(lkb);
4167 return 0;
4168
4169 fail:
4170 setup_local_lkb(ls, ms);
4171 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4172 return error;
4173 }
4174
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4175 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4176 {
4177 struct dlm_lkb *lkb;
4178 struct dlm_rsb *r;
4179 int error;
4180
4181 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4182 if (error)
4183 goto fail;
4184
4185 receive_flags(lkb, ms);
4186
4187 r = lkb->lkb_resource;
4188
4189 hold_rsb(r);
4190 lock_rsb(r);
4191
4192 error = validate_message(lkb, ms);
4193 if (error)
4194 goto out;
4195
4196 error = do_cancel(r, lkb);
4197 send_cancel_reply(r, lkb, error);
4198 do_cancel_effects(r, lkb, error);
4199 out:
4200 unlock_rsb(r);
4201 put_rsb(r);
4202 dlm_put_lkb(lkb);
4203 return 0;
4204
4205 fail:
4206 setup_local_lkb(ls, ms);
4207 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4208 return error;
4209 }
4210
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4211 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4212 {
4213 struct dlm_lkb *lkb;
4214 struct dlm_rsb *r;
4215 int error;
4216
4217 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4218 if (error)
4219 return error;
4220
4221 r = lkb->lkb_resource;
4222
4223 hold_rsb(r);
4224 lock_rsb(r);
4225
4226 error = validate_message(lkb, ms);
4227 if (error)
4228 goto out;
4229
4230 receive_flags_reply(lkb, ms, false);
4231 if (is_altmode(lkb))
4232 munge_altmode(lkb, ms);
4233 grant_lock_pc(r, lkb, ms);
4234 queue_cast(r, lkb, 0);
4235 out:
4236 unlock_rsb(r);
4237 put_rsb(r);
4238 dlm_put_lkb(lkb);
4239 return 0;
4240 }
4241
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4242 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4243 {
4244 struct dlm_lkb *lkb;
4245 struct dlm_rsb *r;
4246 int error;
4247
4248 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4249 if (error)
4250 return error;
4251
4252 r = lkb->lkb_resource;
4253
4254 hold_rsb(r);
4255 lock_rsb(r);
4256
4257 error = validate_message(lkb, ms);
4258 if (error)
4259 goto out;
4260
4261 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4262 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4263 out:
4264 unlock_rsb(r);
4265 put_rsb(r);
4266 dlm_put_lkb(lkb);
4267 return 0;
4268 }
4269
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4270 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4271 {
4272 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4273
4274 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4275 our_nodeid = dlm_our_nodeid();
4276
4277 len = receive_extralen(ms);
4278
4279 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4280 &ret_nodeid, NULL);
4281
4282 /* Optimization: we're master so treat lookup as a request */
4283 if (!error && ret_nodeid == our_nodeid) {
4284 receive_request(ls, ms);
4285 return;
4286 }
4287 send_lookup_reply(ls, ms, ret_nodeid, error);
4288 }
4289
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4290 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4291 {
4292 char name[DLM_RESNAME_MAXLEN+1];
4293 struct dlm_rsb *r;
4294 int rv, len, dir_nodeid, from_nodeid;
4295
4296 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4297
4298 len = receive_extralen(ms);
4299
4300 if (len > DLM_RESNAME_MAXLEN) {
4301 log_error(ls, "receive_remove from %d bad len %d",
4302 from_nodeid, len);
4303 return;
4304 }
4305
4306 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4307 if (dir_nodeid != dlm_our_nodeid()) {
4308 log_error(ls, "receive_remove from %d bad nodeid %d",
4309 from_nodeid, dir_nodeid);
4310 return;
4311 }
4312
4313 /*
4314 * Look for inactive rsb, if it's there, free it.
4315 * If the rsb is active, it's being used, and we should ignore this
4316 * message. This is an expected race between the dir node sending a
4317 * request to the master node at the same time as the master node sends
4318 * a remove to the dir node. The resolution to that race is for the
4319 * dir node to ignore the remove message, and the master node to
4320 * recreate the master rsb when it gets a request from the dir node for
4321 * an rsb it doesn't have.
4322 */
4323
4324 memset(name, 0, sizeof(name));
4325 memcpy(name, ms->m_extra, len);
4326
4327 rcu_read_lock();
4328 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4329 if (rv) {
4330 rcu_read_unlock();
4331 /* should not happen */
4332 log_error(ls, "%s from %d not found %s", __func__,
4333 from_nodeid, name);
4334 return;
4335 }
4336
4337 write_lock_bh(&ls->ls_rsbtbl_lock);
4338 if (!rsb_flag(r, RSB_HASHED)) {
4339 rcu_read_unlock();
4340 write_unlock_bh(&ls->ls_rsbtbl_lock);
4341 /* should not happen */
4342 log_error(ls, "%s from %d got removed during removal %s",
4343 __func__, from_nodeid, name);
4344 return;
4345 }
4346 /* at this stage the rsb can only being freed here */
4347 rcu_read_unlock();
4348
4349 if (!rsb_flag(r, RSB_INACTIVE)) {
4350 if (r->res_master_nodeid != from_nodeid) {
4351 /* should not happen */
4352 log_error(ls, "receive_remove on active rsb from %d master %d",
4353 from_nodeid, r->res_master_nodeid);
4354 dlm_print_rsb(r);
4355 write_unlock_bh(&ls->ls_rsbtbl_lock);
4356 return;
4357 }
4358
4359 /* Ignore the remove message, see race comment above. */
4360
4361 log_debug(ls, "receive_remove from %d master %d first %x %s",
4362 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4363 name);
4364 write_unlock_bh(&ls->ls_rsbtbl_lock);
4365 return;
4366 }
4367
4368 if (r->res_master_nodeid != from_nodeid) {
4369 log_error(ls, "receive_remove inactive from %d master %d",
4370 from_nodeid, r->res_master_nodeid);
4371 dlm_print_rsb(r);
4372 write_unlock_bh(&ls->ls_rsbtbl_lock);
4373 return;
4374 }
4375
4376 list_del(&r->res_slow_list);
4377 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4378 dlm_rhash_rsb_params);
4379 rsb_clear_flag(r, RSB_HASHED);
4380 write_unlock_bh(&ls->ls_rsbtbl_lock);
4381
4382 free_inactive_rsb(r);
4383 }
4384
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4385 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4386 {
4387 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4388 }
4389
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4390 static int receive_request_reply(struct dlm_ls *ls,
4391 const struct dlm_message *ms)
4392 {
4393 struct dlm_lkb *lkb;
4394 struct dlm_rsb *r;
4395 int error, mstype, result;
4396 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4397
4398 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4399 if (error)
4400 return error;
4401
4402 r = lkb->lkb_resource;
4403 hold_rsb(r);
4404 lock_rsb(r);
4405
4406 error = validate_message(lkb, ms);
4407 if (error)
4408 goto out;
4409
4410 mstype = lkb->lkb_wait_type;
4411 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4412 if (error) {
4413 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4414 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4415 from_dlm_errno(le32_to_cpu(ms->m_result)));
4416 dlm_dump_rsb(r);
4417 goto out;
4418 }
4419
4420 /* Optimization: the dir node was also the master, so it took our
4421 lookup as a request and sent request reply instead of lookup reply */
4422 if (mstype == DLM_MSG_LOOKUP) {
4423 r->res_master_nodeid = from_nodeid;
4424 r->res_nodeid = from_nodeid;
4425 lkb->lkb_nodeid = from_nodeid;
4426 }
4427
4428 /* this is the value returned from do_request() on the master */
4429 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4430
4431 switch (result) {
4432 case -EAGAIN:
4433 /* request would block (be queued) on remote master */
4434 queue_cast(r, lkb, -EAGAIN);
4435 confirm_master(r, -EAGAIN);
4436 unhold_lkb(lkb); /* undoes create_lkb() */
4437 break;
4438
4439 case -EINPROGRESS:
4440 case 0:
4441 /* request was queued or granted on remote master */
4442 receive_flags_reply(lkb, ms, false);
4443 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4444 if (is_altmode(lkb))
4445 munge_altmode(lkb, ms);
4446 if (result) {
4447 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4448 } else {
4449 grant_lock_pc(r, lkb, ms);
4450 queue_cast(r, lkb, 0);
4451 }
4452 confirm_master(r, result);
4453 break;
4454
4455 case -EBADR:
4456 case -ENOTBLK:
4457 /* find_rsb failed to find rsb or rsb wasn't master */
4458 log_limit(ls, "receive_request_reply %x from %d %d "
4459 "master %d dir %d first %x %s", lkb->lkb_id,
4460 from_nodeid, result, r->res_master_nodeid,
4461 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4462
4463 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4464 r->res_master_nodeid != dlm_our_nodeid()) {
4465 /* cause _request_lock->set_master->send_lookup */
4466 r->res_master_nodeid = 0;
4467 r->res_nodeid = -1;
4468 lkb->lkb_nodeid = -1;
4469 }
4470
4471 if (is_overlap(lkb)) {
4472 /* we'll ignore error in cancel/unlock reply */
4473 queue_cast_overlap(r, lkb);
4474 confirm_master(r, result);
4475 unhold_lkb(lkb); /* undoes create_lkb() */
4476 } else {
4477 _request_lock(r, lkb);
4478
4479 if (r->res_master_nodeid == dlm_our_nodeid())
4480 confirm_master(r, 0);
4481 }
4482 break;
4483
4484 default:
4485 log_error(ls, "receive_request_reply %x error %d",
4486 lkb->lkb_id, result);
4487 }
4488
4489 if ((result == 0 || result == -EINPROGRESS) &&
4490 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4491 log_debug(ls, "receive_request_reply %x result %d unlock",
4492 lkb->lkb_id, result);
4493 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4494 send_unlock(r, lkb);
4495 } else if ((result == -EINPROGRESS) &&
4496 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4497 &lkb->lkb_iflags)) {
4498 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4499 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4500 send_cancel(r, lkb);
4501 } else {
4502 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4503 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4504 }
4505 out:
4506 unlock_rsb(r);
4507 put_rsb(r);
4508 dlm_put_lkb(lkb);
4509 return 0;
4510 }
4511
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4512 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4513 const struct dlm_message *ms, bool local)
4514 {
4515 /* this is the value returned from do_convert() on the master */
4516 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4517 case -EAGAIN:
4518 /* convert would block (be queued) on remote master */
4519 queue_cast(r, lkb, -EAGAIN);
4520 break;
4521
4522 case -EDEADLK:
4523 receive_flags_reply(lkb, ms, local);
4524 revert_lock_pc(r, lkb);
4525 queue_cast(r, lkb, -EDEADLK);
4526 break;
4527
4528 case -EINPROGRESS:
4529 /* convert was queued on remote master */
4530 receive_flags_reply(lkb, ms, local);
4531 if (is_demoted(lkb))
4532 munge_demoted(lkb);
4533 del_lkb(r, lkb);
4534 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4535 break;
4536
4537 case 0:
4538 /* convert was granted on remote master */
4539 receive_flags_reply(lkb, ms, local);
4540 if (is_demoted(lkb))
4541 munge_demoted(lkb);
4542 grant_lock_pc(r, lkb, ms);
4543 queue_cast(r, lkb, 0);
4544 break;
4545
4546 default:
4547 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4548 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4549 le32_to_cpu(ms->m_lkid),
4550 from_dlm_errno(le32_to_cpu(ms->m_result)));
4551 dlm_print_rsb(r);
4552 dlm_print_lkb(lkb);
4553 }
4554 }
4555
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4556 static void _receive_convert_reply(struct dlm_lkb *lkb,
4557 const struct dlm_message *ms, bool local)
4558 {
4559 struct dlm_rsb *r = lkb->lkb_resource;
4560 int error;
4561
4562 hold_rsb(r);
4563 lock_rsb(r);
4564
4565 error = validate_message(lkb, ms);
4566 if (error)
4567 goto out;
4568
4569 error = remove_from_waiters_ms(lkb, ms, local);
4570 if (error)
4571 goto out;
4572
4573 __receive_convert_reply(r, lkb, ms, local);
4574 out:
4575 unlock_rsb(r);
4576 put_rsb(r);
4577 }
4578
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4579 static int receive_convert_reply(struct dlm_ls *ls,
4580 const struct dlm_message *ms)
4581 {
4582 struct dlm_lkb *lkb;
4583 int error;
4584
4585 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4586 if (error)
4587 return error;
4588
4589 _receive_convert_reply(lkb, ms, false);
4590 dlm_put_lkb(lkb);
4591 return 0;
4592 }
4593
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4594 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4595 const struct dlm_message *ms, bool local)
4596 {
4597 struct dlm_rsb *r = lkb->lkb_resource;
4598 int error;
4599
4600 hold_rsb(r);
4601 lock_rsb(r);
4602
4603 error = validate_message(lkb, ms);
4604 if (error)
4605 goto out;
4606
4607 error = remove_from_waiters_ms(lkb, ms, local);
4608 if (error)
4609 goto out;
4610
4611 /* this is the value returned from do_unlock() on the master */
4612
4613 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4614 case -DLM_EUNLOCK:
4615 receive_flags_reply(lkb, ms, local);
4616 remove_lock_pc(r, lkb);
4617 queue_cast(r, lkb, -DLM_EUNLOCK);
4618 break;
4619 case -ENOENT:
4620 break;
4621 default:
4622 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4623 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4624 }
4625 out:
4626 unlock_rsb(r);
4627 put_rsb(r);
4628 }
4629
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4630 static int receive_unlock_reply(struct dlm_ls *ls,
4631 const struct dlm_message *ms)
4632 {
4633 struct dlm_lkb *lkb;
4634 int error;
4635
4636 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4637 if (error)
4638 return error;
4639
4640 _receive_unlock_reply(lkb, ms, false);
4641 dlm_put_lkb(lkb);
4642 return 0;
4643 }
4644
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4645 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4646 const struct dlm_message *ms, bool local)
4647 {
4648 struct dlm_rsb *r = lkb->lkb_resource;
4649 int error;
4650
4651 hold_rsb(r);
4652 lock_rsb(r);
4653
4654 error = validate_message(lkb, ms);
4655 if (error)
4656 goto out;
4657
4658 error = remove_from_waiters_ms(lkb, ms, local);
4659 if (error)
4660 goto out;
4661
4662 /* this is the value returned from do_cancel() on the master */
4663
4664 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4665 case -DLM_ECANCEL:
4666 receive_flags_reply(lkb, ms, local);
4667 revert_lock_pc(r, lkb);
4668 queue_cast(r, lkb, -DLM_ECANCEL);
4669 break;
4670 case 0:
4671 break;
4672 default:
4673 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4674 lkb->lkb_id,
4675 from_dlm_errno(le32_to_cpu(ms->m_result)));
4676 }
4677 out:
4678 unlock_rsb(r);
4679 put_rsb(r);
4680 }
4681
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4682 static int receive_cancel_reply(struct dlm_ls *ls,
4683 const struct dlm_message *ms)
4684 {
4685 struct dlm_lkb *lkb;
4686 int error;
4687
4688 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4689 if (error)
4690 return error;
4691
4692 _receive_cancel_reply(lkb, ms, false);
4693 dlm_put_lkb(lkb);
4694 return 0;
4695 }
4696
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4697 static void receive_lookup_reply(struct dlm_ls *ls,
4698 const struct dlm_message *ms)
4699 {
4700 struct dlm_lkb *lkb;
4701 struct dlm_rsb *r;
4702 int error, ret_nodeid;
4703 int do_lookup_list = 0;
4704
4705 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4706 if (error) {
4707 log_error(ls, "%s no lkid %x", __func__,
4708 le32_to_cpu(ms->m_lkid));
4709 return;
4710 }
4711
4712 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4713 FIXME: will a non-zero error ever be returned? */
4714
4715 r = lkb->lkb_resource;
4716 hold_rsb(r);
4717 lock_rsb(r);
4718
4719 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4720 if (error)
4721 goto out;
4722
4723 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4724
4725 /* We sometimes receive a request from the dir node for this
4726 rsb before we've received the dir node's loookup_reply for it.
4727 The request from the dir node implies we're the master, so we set
4728 ourself as master in receive_request_reply, and verify here that
4729 we are indeed the master. */
4730
4731 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4732 /* This should never happen */
4733 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4734 "master %d dir %d our %d first %x %s",
4735 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4736 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4737 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4738 }
4739
4740 if (ret_nodeid == dlm_our_nodeid()) {
4741 r->res_master_nodeid = ret_nodeid;
4742 r->res_nodeid = 0;
4743 do_lookup_list = 1;
4744 r->res_first_lkid = 0;
4745 } else if (ret_nodeid == -1) {
4746 /* the remote node doesn't believe it's the dir node */
4747 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4748 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4749 r->res_master_nodeid = 0;
4750 r->res_nodeid = -1;
4751 lkb->lkb_nodeid = -1;
4752 } else {
4753 /* set_master() will set lkb_nodeid from r */
4754 r->res_master_nodeid = ret_nodeid;
4755 r->res_nodeid = ret_nodeid;
4756 }
4757
4758 if (is_overlap(lkb)) {
4759 log_debug(ls, "receive_lookup_reply %x unlock %x",
4760 lkb->lkb_id, dlm_iflags_val(lkb));
4761 queue_cast_overlap(r, lkb);
4762 unhold_lkb(lkb); /* undoes create_lkb() */
4763 goto out_list;
4764 }
4765
4766 _request_lock(r, lkb);
4767
4768 out_list:
4769 if (do_lookup_list)
4770 process_lookup_list(r);
4771 out:
4772 unlock_rsb(r);
4773 put_rsb(r);
4774 dlm_put_lkb(lkb);
4775 }
4776
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4777 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4778 uint32_t saved_seq)
4779 {
4780 int error = 0, noent = 0;
4781
4782 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4783 log_limit(ls, "receive %d from non-member %d %x %x %d",
4784 le32_to_cpu(ms->m_type),
4785 le32_to_cpu(ms->m_header.h_nodeid),
4786 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4787 from_dlm_errno(le32_to_cpu(ms->m_result)));
4788 return;
4789 }
4790
4791 switch (ms->m_type) {
4792
4793 /* messages sent to a master node */
4794
4795 case cpu_to_le32(DLM_MSG_REQUEST):
4796 error = receive_request(ls, ms);
4797 break;
4798
4799 case cpu_to_le32(DLM_MSG_CONVERT):
4800 error = receive_convert(ls, ms);
4801 break;
4802
4803 case cpu_to_le32(DLM_MSG_UNLOCK):
4804 error = receive_unlock(ls, ms);
4805 break;
4806
4807 case cpu_to_le32(DLM_MSG_CANCEL):
4808 noent = 1;
4809 error = receive_cancel(ls, ms);
4810 break;
4811
4812 /* messages sent from a master node (replies to above) */
4813
4814 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4815 error = receive_request_reply(ls, ms);
4816 break;
4817
4818 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4819 error = receive_convert_reply(ls, ms);
4820 break;
4821
4822 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4823 error = receive_unlock_reply(ls, ms);
4824 break;
4825
4826 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4827 error = receive_cancel_reply(ls, ms);
4828 break;
4829
4830 /* messages sent from a master node (only two types of async msg) */
4831
4832 case cpu_to_le32(DLM_MSG_GRANT):
4833 noent = 1;
4834 error = receive_grant(ls, ms);
4835 break;
4836
4837 case cpu_to_le32(DLM_MSG_BAST):
4838 noent = 1;
4839 error = receive_bast(ls, ms);
4840 break;
4841
4842 /* messages sent to a dir node */
4843
4844 case cpu_to_le32(DLM_MSG_LOOKUP):
4845 receive_lookup(ls, ms);
4846 break;
4847
4848 case cpu_to_le32(DLM_MSG_REMOVE):
4849 receive_remove(ls, ms);
4850 break;
4851
4852 /* messages sent from a dir node (remove has no reply) */
4853
4854 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4855 receive_lookup_reply(ls, ms);
4856 break;
4857
4858 /* other messages */
4859
4860 case cpu_to_le32(DLM_MSG_PURGE):
4861 receive_purge(ls, ms);
4862 break;
4863
4864 default:
4865 log_error(ls, "unknown message type %d",
4866 le32_to_cpu(ms->m_type));
4867 }
4868
4869 /*
4870 * When checking for ENOENT, we're checking the result of
4871 * find_lkb(m_remid):
4872 *
4873 * The lock id referenced in the message wasn't found. This may
4874 * happen in normal usage for the async messages and cancel, so
4875 * only use log_debug for them.
4876 *
4877 * Some errors are expected and normal.
4878 */
4879
4880 if (error == -ENOENT && noent) {
4881 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4882 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4883 le32_to_cpu(ms->m_header.h_nodeid),
4884 le32_to_cpu(ms->m_lkid), saved_seq);
4885 } else if (error == -ENOENT) {
4886 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4887 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4888 le32_to_cpu(ms->m_header.h_nodeid),
4889 le32_to_cpu(ms->m_lkid), saved_seq);
4890
4891 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4892 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4893 }
4894
4895 if (error == -EINVAL) {
4896 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4897 "saved_seq %u",
4898 le32_to_cpu(ms->m_type),
4899 le32_to_cpu(ms->m_header.h_nodeid),
4900 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4901 saved_seq);
4902 }
4903 }
4904
4905 /* If the lockspace is in recovery mode (locking stopped), then normal
4906 messages are saved on the requestqueue for processing after recovery is
4907 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4908 messages off the requestqueue before we process new ones. This occurs right
4909 after recovery completes when we transition from saving all messages on
4910 requestqueue, to processing all the saved messages, to processing new
4911 messages as they arrive. */
4912
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4913 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4914 int nodeid)
4915 {
4916 try_again:
4917 read_lock_bh(&ls->ls_requestqueue_lock);
4918 if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4919 /* If we were a member of this lockspace, left, and rejoined,
4920 other nodes may still be sending us messages from the
4921 lockspace generation before we left. */
4922 if (WARN_ON_ONCE(!ls->ls_generation)) {
4923 read_unlock_bh(&ls->ls_requestqueue_lock);
4924 log_limit(ls, "receive %d from %d ignore old gen",
4925 le32_to_cpu(ms->m_type), nodeid);
4926 return;
4927 }
4928
4929 read_unlock_bh(&ls->ls_requestqueue_lock);
4930 write_lock_bh(&ls->ls_requestqueue_lock);
4931 /* recheck because we hold writelock now */
4932 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4933 write_unlock_bh(&ls->ls_requestqueue_lock);
4934 goto try_again;
4935 }
4936
4937 dlm_add_requestqueue(ls, nodeid, ms);
4938 write_unlock_bh(&ls->ls_requestqueue_lock);
4939 } else {
4940 _receive_message(ls, ms, 0);
4941 read_unlock_bh(&ls->ls_requestqueue_lock);
4942 }
4943 }
4944
4945 /* This is called by dlm_recoverd to process messages that were saved on
4946 the requestqueue. */
4947
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4948 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4949 uint32_t saved_seq)
4950 {
4951 _receive_message(ls, ms, saved_seq);
4952 }
4953
4954 /* This is called by the midcomms layer when something is received for
4955 the lockspace. It could be either a MSG (normal message sent as part of
4956 standard locking activity) or an RCOM (recovery message sent as part of
4957 lockspace recovery). */
4958
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4959 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4960 {
4961 const struct dlm_header *hd = &p->header;
4962 struct dlm_ls *ls;
4963 int type = 0;
4964
4965 switch (hd->h_cmd) {
4966 case DLM_MSG:
4967 type = le32_to_cpu(p->message.m_type);
4968 break;
4969 case DLM_RCOM:
4970 type = le32_to_cpu(p->rcom.rc_type);
4971 break;
4972 default:
4973 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4974 return;
4975 }
4976
4977 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4978 log_print("invalid h_nodeid %d from %d lockspace %x",
4979 le32_to_cpu(hd->h_nodeid), nodeid,
4980 le32_to_cpu(hd->u.h_lockspace));
4981 return;
4982 }
4983
4984 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4985 if (!ls) {
4986 if (dlm_config.ci_log_debug) {
4987 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4988 "%u from %d cmd %d type %d\n",
4989 le32_to_cpu(hd->u.h_lockspace), nodeid,
4990 hd->h_cmd, type);
4991 }
4992
4993 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4994 dlm_send_ls_not_ready(nodeid, &p->rcom);
4995 return;
4996 }
4997
4998 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4999 be inactive (in this ls) before transitioning to recovery mode */
5000
5001 read_lock_bh(&ls->ls_recv_active);
5002 if (hd->h_cmd == DLM_MSG)
5003 dlm_receive_message(ls, &p->message, nodeid);
5004 else if (hd->h_cmd == DLM_RCOM)
5005 dlm_receive_rcom(ls, &p->rcom, nodeid);
5006 else
5007 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5008 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5009 read_unlock_bh(&ls->ls_recv_active);
5010
5011 dlm_put_lockspace(ls);
5012 }
5013
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)5014 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5015 struct dlm_message *ms_local)
5016 {
5017 if (middle_conversion(lkb)) {
5018 log_rinfo(ls, "%s %x middle convert in progress", __func__,
5019 lkb->lkb_id);
5020
5021 /* We sent this lock to the new master. The new master will
5022 * tell us when it's granted. We no longer need a reply, so
5023 * use a fake reply to put the lkb into the right state.
5024 */
5025 hold_lkb(lkb);
5026 memset(ms_local, 0, sizeof(struct dlm_message));
5027 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5028 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5029 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5030 _receive_convert_reply(lkb, ms_local, true);
5031 unhold_lkb(lkb);
5032
5033 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5034 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5035 }
5036
5037 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5038 conversions are async; there's no reply from the remote master */
5039 }
5040
5041 /* A waiting lkb needs recovery if the master node has failed, or
5042 the master node is changing (only when no directory is used) */
5043
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5044 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5045 int dir_nodeid)
5046 {
5047 if (dlm_no_directory(ls))
5048 return 1;
5049
5050 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5051 return 1;
5052
5053 return 0;
5054 }
5055
5056 /* Recovery for locks that are waiting for replies from nodes that are now
5057 gone. We can just complete unlocks and cancels by faking a reply from the
5058 dead node. Requests and up-conversions we flag to be resent after
5059 recovery. Down-conversions can just be completed with a fake reply like
5060 unlocks. Conversions between PR and CW need special attention. */
5061
dlm_recover_waiters_pre(struct dlm_ls * ls)5062 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5063 {
5064 struct dlm_lkb *lkb, *safe;
5065 struct dlm_message *ms_local;
5066 int wait_type, local_unlock_result, local_cancel_result;
5067 int dir_nodeid;
5068
5069 ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5070 if (!ms_local)
5071 return;
5072
5073 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5074
5075 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5076
5077 /* exclude debug messages about unlocks because there can be so
5078 many and they aren't very interesting */
5079
5080 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5081 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5082 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5083 lkb->lkb_id,
5084 lkb->lkb_remid,
5085 lkb->lkb_wait_type,
5086 lkb->lkb_resource->res_nodeid,
5087 lkb->lkb_nodeid,
5088 lkb->lkb_wait_nodeid,
5089 dir_nodeid);
5090 }
5091
5092 /* all outstanding lookups, regardless of destination will be
5093 resent after recovery is done */
5094
5095 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5096 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5097 continue;
5098 }
5099
5100 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5101 continue;
5102
5103 wait_type = lkb->lkb_wait_type;
5104 local_unlock_result = -DLM_EUNLOCK;
5105 local_cancel_result = -DLM_ECANCEL;
5106
5107 /* Main reply may have been received leaving a zero wait_type,
5108 but a reply for the overlapping op may not have been
5109 received. In that case we need to fake the appropriate
5110 reply for the overlap op. */
5111
5112 if (!wait_type) {
5113 if (is_overlap_cancel(lkb)) {
5114 wait_type = DLM_MSG_CANCEL;
5115 if (lkb->lkb_grmode == DLM_LOCK_IV)
5116 local_cancel_result = 0;
5117 }
5118 if (is_overlap_unlock(lkb)) {
5119 wait_type = DLM_MSG_UNLOCK;
5120 if (lkb->lkb_grmode == DLM_LOCK_IV)
5121 local_unlock_result = -ENOENT;
5122 }
5123
5124 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5125 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5126 local_cancel_result, local_unlock_result);
5127 }
5128
5129 switch (wait_type) {
5130
5131 case DLM_MSG_REQUEST:
5132 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5133 break;
5134
5135 case DLM_MSG_CONVERT:
5136 recover_convert_waiter(ls, lkb, ms_local);
5137 break;
5138
5139 case DLM_MSG_UNLOCK:
5140 hold_lkb(lkb);
5141 memset(ms_local, 0, sizeof(struct dlm_message));
5142 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5143 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5144 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5145 _receive_unlock_reply(lkb, ms_local, true);
5146 dlm_put_lkb(lkb);
5147 break;
5148
5149 case DLM_MSG_CANCEL:
5150 hold_lkb(lkb);
5151 memset(ms_local, 0, sizeof(struct dlm_message));
5152 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5153 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5154 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5155 _receive_cancel_reply(lkb, ms_local, true);
5156 dlm_put_lkb(lkb);
5157 break;
5158
5159 default:
5160 log_error(ls, "invalid lkb wait_type %d %d",
5161 lkb->lkb_wait_type, wait_type);
5162 }
5163 schedule();
5164 }
5165 kfree(ms_local);
5166 }
5167
find_resend_waiter(struct dlm_ls * ls)5168 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5169 {
5170 struct dlm_lkb *lkb = NULL, *iter;
5171
5172 spin_lock_bh(&ls->ls_waiters_lock);
5173 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5174 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5175 hold_lkb(iter);
5176 lkb = iter;
5177 break;
5178 }
5179 }
5180 spin_unlock_bh(&ls->ls_waiters_lock);
5181
5182 return lkb;
5183 }
5184
5185 /*
5186 * Forced state reset for locks that were in the middle of remote operations
5187 * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5188 * for a reply from a remote operation.) The lkbs remaining on the waiters
5189 * list need to be reevaluated; some may need resending to a different node
5190 * than previously, and some may now need local handling rather than remote.
5191 *
5192 * First, the lkb state for the voided remote operation is forcibly reset,
5193 * equivalent to what remove_from_waiters() would normally do:
5194 * . lkb removed from ls_waiters list
5195 * . lkb wait_type cleared
5196 * . lkb waiters_count cleared
5197 * . lkb ref count decremented for each waiters_count (almost always 1,
5198 * but possibly 2 in case of cancel/unlock overlapping, which means
5199 * two remote replies were being expected for the lkb.)
5200 *
5201 * Second, the lkb is reprocessed like an original operation would be,
5202 * by passing it to _request_lock or _convert_lock, which will either
5203 * process the lkb operation locally, or send it to a remote node again
5204 * and put the lkb back onto the waiters list.
5205 *
5206 * When reprocessing the lkb, we may find that it's flagged for an overlapping
5207 * force-unlock or cancel, either from before recovery began, or after recovery
5208 * finished. If this is the case, the unlock/cancel is done directly, and the
5209 * original operation is not initiated again (no _request_lock/_convert_lock.)
5210 */
5211
dlm_recover_waiters_post(struct dlm_ls * ls)5212 int dlm_recover_waiters_post(struct dlm_ls *ls)
5213 {
5214 struct dlm_lkb *lkb;
5215 struct dlm_rsb *r;
5216 int error = 0, mstype, err, oc, ou;
5217
5218 while (1) {
5219 if (dlm_locking_stopped(ls)) {
5220 log_debug(ls, "recover_waiters_post aborted");
5221 error = -EINTR;
5222 break;
5223 }
5224
5225 /*
5226 * Find an lkb from the waiters list that's been affected by
5227 * recovery node changes, and needs to be reprocessed. Does
5228 * hold_lkb(), adding a refcount.
5229 */
5230 lkb = find_resend_waiter(ls);
5231 if (!lkb)
5232 break;
5233
5234 r = lkb->lkb_resource;
5235 hold_rsb(r);
5236 lock_rsb(r);
5237
5238 /*
5239 * If the lkb has been flagged for a force unlock or cancel,
5240 * then the reprocessing below will be replaced by just doing
5241 * the unlock/cancel directly.
5242 */
5243 mstype = lkb->lkb_wait_type;
5244 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5245 &lkb->lkb_iflags);
5246 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5247 &lkb->lkb_iflags);
5248 err = 0;
5249
5250 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5251 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5252 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5253 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5254 dlm_dir_nodeid(r), oc, ou);
5255
5256 /*
5257 * No reply to the pre-recovery operation will now be received,
5258 * so a forced equivalent of remove_from_waiters() is needed to
5259 * reset the waiters state that was in place before recovery.
5260 */
5261
5262 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5263
5264 /* Forcibly clear wait_type */
5265 lkb->lkb_wait_type = 0;
5266
5267 /*
5268 * Forcibly reset wait_count and associated refcount. The
5269 * wait_count will almost always be 1, but in case of an
5270 * overlapping unlock/cancel it could be 2: see where
5271 * add_to_waiters() finds the lkb is already on the waiters
5272 * list and does lkb_wait_count++; hold_lkb().
5273 */
5274 while (lkb->lkb_wait_count) {
5275 lkb->lkb_wait_count--;
5276 unhold_lkb(lkb);
5277 }
5278
5279 /* Forcibly remove from waiters list */
5280 spin_lock_bh(&ls->ls_waiters_lock);
5281 list_del_init(&lkb->lkb_wait_reply);
5282 spin_unlock_bh(&ls->ls_waiters_lock);
5283
5284 /*
5285 * The lkb is now clear of all prior waiters state and can be
5286 * processed locally, or sent to remote node again, or directly
5287 * cancelled/unlocked.
5288 */
5289
5290 if (oc || ou) {
5291 /* do an unlock or cancel instead of resending */
5292 switch (mstype) {
5293 case DLM_MSG_LOOKUP:
5294 case DLM_MSG_REQUEST:
5295 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5296 -DLM_ECANCEL);
5297 unhold_lkb(lkb); /* undoes create_lkb() */
5298 break;
5299 case DLM_MSG_CONVERT:
5300 if (oc) {
5301 queue_cast(r, lkb, -DLM_ECANCEL);
5302 } else {
5303 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5304 _unlock_lock(r, lkb);
5305 }
5306 break;
5307 default:
5308 err = 1;
5309 }
5310 } else {
5311 switch (mstype) {
5312 case DLM_MSG_LOOKUP:
5313 case DLM_MSG_REQUEST:
5314 _request_lock(r, lkb);
5315 if (r->res_nodeid != -1 && is_master(r))
5316 confirm_master(r, 0);
5317 break;
5318 case DLM_MSG_CONVERT:
5319 _convert_lock(r, lkb);
5320 break;
5321 default:
5322 err = 1;
5323 }
5324 }
5325
5326 if (err) {
5327 log_error(ls, "waiter %x msg %d r_nodeid %d "
5328 "dir_nodeid %d overlap %d %d",
5329 lkb->lkb_id, mstype, r->res_nodeid,
5330 dlm_dir_nodeid(r), oc, ou);
5331 }
5332 unlock_rsb(r);
5333 put_rsb(r);
5334 dlm_put_lkb(lkb);
5335 }
5336
5337 return error;
5338 }
5339
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5340 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5341 struct list_head *list)
5342 {
5343 struct dlm_lkb *lkb, *safe;
5344
5345 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5346 if (!is_master_copy(lkb))
5347 continue;
5348
5349 /* don't purge lkbs we've added in recover_master_copy for
5350 the current recovery seq */
5351
5352 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5353 continue;
5354
5355 del_lkb(r, lkb);
5356
5357 /* this put should free the lkb */
5358 if (!dlm_put_lkb(lkb))
5359 log_error(ls, "purged mstcpy lkb not released");
5360 }
5361 }
5362
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5363 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5364 {
5365 struct dlm_ls *ls = r->res_ls;
5366
5367 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5368 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5369 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5370 }
5371
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5372 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5373 struct list_head *list,
5374 int nodeid_gone, unsigned int *count)
5375 {
5376 struct dlm_lkb *lkb, *safe;
5377
5378 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5379 if (!is_master_copy(lkb))
5380 continue;
5381
5382 if ((lkb->lkb_nodeid == nodeid_gone) ||
5383 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5384
5385 /* tell recover_lvb to invalidate the lvb
5386 because a node holding EX/PW failed */
5387 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5388 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5389 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5390 }
5391
5392 del_lkb(r, lkb);
5393
5394 /* this put should free the lkb */
5395 if (!dlm_put_lkb(lkb))
5396 log_error(ls, "purged dead lkb not released");
5397
5398 rsb_set_flag(r, RSB_RECOVER_GRANT);
5399
5400 (*count)++;
5401 }
5402 }
5403 }
5404
5405 /* Get rid of locks held by nodes that are gone. */
5406
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5407 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5408 {
5409 struct dlm_rsb *r;
5410 struct dlm_member *memb;
5411 int nodes_count = 0;
5412 int nodeid_gone = 0;
5413 unsigned int lkb_count = 0;
5414
5415 /* cache one removed nodeid to optimize the common
5416 case of a single node removed */
5417
5418 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5419 nodes_count++;
5420 nodeid_gone = memb->nodeid;
5421 }
5422
5423 if (!nodes_count)
5424 return;
5425
5426 list_for_each_entry(r, root_list, res_root_list) {
5427 lock_rsb(r);
5428 if (r->res_nodeid != -1 && is_master(r)) {
5429 purge_dead_list(ls, r, &r->res_grantqueue,
5430 nodeid_gone, &lkb_count);
5431 purge_dead_list(ls, r, &r->res_convertqueue,
5432 nodeid_gone, &lkb_count);
5433 purge_dead_list(ls, r, &r->res_waitqueue,
5434 nodeid_gone, &lkb_count);
5435 }
5436 unlock_rsb(r);
5437
5438 cond_resched();
5439 }
5440
5441 if (lkb_count)
5442 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5443 lkb_count, nodes_count);
5444 }
5445
find_grant_rsb(struct dlm_ls * ls)5446 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5447 {
5448 struct dlm_rsb *r;
5449
5450 read_lock_bh(&ls->ls_rsbtbl_lock);
5451 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5452 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5453 continue;
5454 if (!is_master(r)) {
5455 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5456 continue;
5457 }
5458 hold_rsb(r);
5459 read_unlock_bh(&ls->ls_rsbtbl_lock);
5460 return r;
5461 }
5462 read_unlock_bh(&ls->ls_rsbtbl_lock);
5463 return NULL;
5464 }
5465
5466 /*
5467 * Attempt to grant locks on resources that we are the master of.
5468 * Locks may have become grantable during recovery because locks
5469 * from departed nodes have been purged (or not rebuilt), allowing
5470 * previously blocked locks to now be granted. The subset of rsb's
5471 * we are interested in are those with lkb's on either the convert or
5472 * waiting queues.
5473 *
5474 * Simplest would be to go through each master rsb and check for non-empty
5475 * convert or waiting queues, and attempt to grant on those rsbs.
5476 * Checking the queues requires lock_rsb, though, for which we'd need
5477 * to release the rsbtbl lock. This would make iterating through all
5478 * rsb's very inefficient. So, we rely on earlier recovery routines
5479 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5480 * locks for.
5481 */
5482
dlm_recover_grant(struct dlm_ls * ls)5483 void dlm_recover_grant(struct dlm_ls *ls)
5484 {
5485 struct dlm_rsb *r;
5486 unsigned int count = 0;
5487 unsigned int rsb_count = 0;
5488 unsigned int lkb_count = 0;
5489
5490 while (1) {
5491 r = find_grant_rsb(ls);
5492 if (!r)
5493 break;
5494
5495 rsb_count++;
5496 count = 0;
5497 lock_rsb(r);
5498 /* the RECOVER_GRANT flag is checked in the grant path */
5499 grant_pending_locks(r, &count);
5500 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5501 lkb_count += count;
5502 confirm_master(r, 0);
5503 unlock_rsb(r);
5504 put_rsb(r);
5505 cond_resched();
5506 }
5507
5508 if (lkb_count)
5509 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5510 lkb_count, rsb_count);
5511 }
5512
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5513 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5514 uint32_t remid)
5515 {
5516 struct dlm_lkb *lkb;
5517
5518 list_for_each_entry(lkb, head, lkb_statequeue) {
5519 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5520 return lkb;
5521 }
5522 return NULL;
5523 }
5524
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5525 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5526 uint32_t remid)
5527 {
5528 struct dlm_lkb *lkb;
5529
5530 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5531 if (lkb)
5532 return lkb;
5533 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5534 if (lkb)
5535 return lkb;
5536 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5537 if (lkb)
5538 return lkb;
5539 return NULL;
5540 }
5541
5542 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5543 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5544 struct dlm_rsb *r, const struct dlm_rcom *rc)
5545 {
5546 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5547
5548 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5549 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5550 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5551 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5552 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5553 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5554 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5555 lkb->lkb_rqmode = rl->rl_rqmode;
5556 lkb->lkb_grmode = rl->rl_grmode;
5557 /* don't set lkb_status because add_lkb wants to itself */
5558
5559 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5560 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5561
5562 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5563 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5564 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5565 if (lvblen > ls->ls_lvblen)
5566 return -EINVAL;
5567 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5568 if (!lkb->lkb_lvbptr)
5569 return -ENOMEM;
5570 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5571 }
5572
5573 /* Conversions between PR and CW (middle modes) need special handling.
5574 The real granted mode of these converting locks cannot be determined
5575 until all locks have been rebuilt on the rsb (recover_conversion) */
5576
5577 if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5578 /* We may need to adjust grmode depending on other granted locks. */
5579 log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5580 __func__, lkb->lkb_id, lkb->lkb_grmode,
5581 lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5582 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5583 }
5584
5585 return 0;
5586 }
5587
5588 /* This lkb may have been recovered in a previous aborted recovery so we need
5589 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5590 If so we just send back a standard reply. If not, we create a new lkb with
5591 the given values and send back our lkid. We send back our lkid by sending
5592 back the rcom_lock struct we got but with the remid field filled in. */
5593
5594 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5595 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5596 __le32 *rl_remid, __le32 *rl_result)
5597 {
5598 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5599 struct dlm_rsb *r;
5600 struct dlm_lkb *lkb;
5601 uint32_t remid = 0;
5602 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5603 int error;
5604
5605 /* init rl_remid with rcom lock rl_remid */
5606 *rl_remid = rl->rl_remid;
5607
5608 if (rl->rl_parent_lkid) {
5609 error = -EOPNOTSUPP;
5610 goto out;
5611 }
5612
5613 remid = le32_to_cpu(rl->rl_lkid);
5614
5615 /* In general we expect the rsb returned to be R_MASTER, but we don't
5616 have to require it. Recovery of masters on one node can overlap
5617 recovery of locks on another node, so one node can send us MSTCPY
5618 locks before we've made ourselves master of this rsb. We can still
5619 add new MSTCPY locks that we receive here without any harm; when
5620 we make ourselves master, dlm_recover_masters() won't touch the
5621 MSTCPY locks we've received early. */
5622
5623 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5624 from_nodeid, R_RECEIVE_RECOVER, &r);
5625 if (error)
5626 goto out;
5627
5628 lock_rsb(r);
5629
5630 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5631 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5632 from_nodeid, remid);
5633 error = -EBADR;
5634 goto out_unlock;
5635 }
5636
5637 lkb = search_remid(r, from_nodeid, remid);
5638 if (lkb) {
5639 error = -EEXIST;
5640 goto out_remid;
5641 }
5642
5643 error = create_lkb(ls, &lkb);
5644 if (error)
5645 goto out_unlock;
5646
5647 error = receive_rcom_lock_args(ls, lkb, r, rc);
5648 if (error) {
5649 __put_lkb(ls, lkb);
5650 goto out_unlock;
5651 }
5652
5653 attach_lkb(r, lkb);
5654 add_lkb(r, lkb, rl->rl_status);
5655 ls->ls_recover_locks_in++;
5656
5657 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5658 rsb_set_flag(r, RSB_RECOVER_GRANT);
5659
5660 out_remid:
5661 /* this is the new value returned to the lock holder for
5662 saving in its process-copy lkb */
5663 *rl_remid = cpu_to_le32(lkb->lkb_id);
5664
5665 lkb->lkb_recover_seq = ls->ls_recover_seq;
5666
5667 out_unlock:
5668 unlock_rsb(r);
5669 put_rsb(r);
5670 out:
5671 if (error && error != -EEXIST)
5672 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5673 from_nodeid, remid, error);
5674 *rl_result = cpu_to_le32(error);
5675 return error;
5676 }
5677
5678 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5679 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5680 uint64_t seq)
5681 {
5682 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5683 struct dlm_rsb *r;
5684 struct dlm_lkb *lkb;
5685 uint32_t lkid, remid;
5686 int error, result;
5687
5688 lkid = le32_to_cpu(rl->rl_lkid);
5689 remid = le32_to_cpu(rl->rl_remid);
5690 result = le32_to_cpu(rl->rl_result);
5691
5692 error = find_lkb(ls, lkid, &lkb);
5693 if (error) {
5694 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5695 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5696 result);
5697 return error;
5698 }
5699
5700 r = lkb->lkb_resource;
5701 hold_rsb(r);
5702 lock_rsb(r);
5703
5704 if (!is_process_copy(lkb)) {
5705 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5706 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 result);
5708 dlm_dump_rsb(r);
5709 unlock_rsb(r);
5710 put_rsb(r);
5711 dlm_put_lkb(lkb);
5712 return -EINVAL;
5713 }
5714
5715 switch (result) {
5716 case -EBADR:
5717 /* There's a chance the new master received our lock before
5718 dlm_recover_master_reply(), this wouldn't happen if we did
5719 a barrier between recover_masters and recover_locks. */
5720
5721 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5722 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5723 result);
5724
5725 dlm_send_rcom_lock(r, lkb, seq);
5726 goto out;
5727 case -EEXIST:
5728 case 0:
5729 lkb->lkb_remid = remid;
5730 break;
5731 default:
5732 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5733 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5734 result);
5735 }
5736
5737 /* an ack for dlm_recover_locks() which waits for replies from
5738 all the locks it sends to new masters */
5739 dlm_recovered_lock(r);
5740 out:
5741 unlock_rsb(r);
5742 put_rsb(r);
5743 dlm_put_lkb(lkb);
5744
5745 return 0;
5746 }
5747
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5748 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5749 int mode, uint32_t flags, void *name, unsigned int namelen)
5750 {
5751 struct dlm_lkb *lkb;
5752 struct dlm_args args;
5753 bool do_put = true;
5754 int error;
5755
5756 dlm_lock_recovery(ls);
5757
5758 error = create_lkb(ls, &lkb);
5759 if (error) {
5760 kfree(ua);
5761 goto out;
5762 }
5763
5764 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5765
5766 if (flags & DLM_LKF_VALBLK) {
5767 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5768 if (!ua->lksb.sb_lvbptr) {
5769 kfree(ua);
5770 error = -ENOMEM;
5771 goto out_put;
5772 }
5773 }
5774 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5775 fake_bastfn, &args);
5776 if (error) {
5777 kfree(ua->lksb.sb_lvbptr);
5778 ua->lksb.sb_lvbptr = NULL;
5779 kfree(ua);
5780 goto out_put;
5781 }
5782
5783 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5784 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5785 lock and that lkb_astparam is the dlm_user_args structure. */
5786 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5787 error = request_lock(ls, lkb, name, namelen, &args);
5788
5789 switch (error) {
5790 case 0:
5791 break;
5792 case -EINPROGRESS:
5793 error = 0;
5794 break;
5795 case -EAGAIN:
5796 error = 0;
5797 fallthrough;
5798 default:
5799 goto out_put;
5800 }
5801
5802 /* add this new lkb to the per-process list of locks */
5803 spin_lock_bh(&ua->proc->locks_spin);
5804 hold_lkb(lkb);
5805 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5806 spin_unlock_bh(&ua->proc->locks_spin);
5807 do_put = false;
5808 out_put:
5809 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5810 if (do_put)
5811 __put_lkb(ls, lkb);
5812 out:
5813 dlm_unlock_recovery(ls);
5814 return error;
5815 }
5816
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5817 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5818 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5819 {
5820 struct dlm_lkb *lkb;
5821 struct dlm_args args;
5822 struct dlm_user_args *ua;
5823 int error;
5824
5825 dlm_lock_recovery(ls);
5826
5827 error = find_lkb(ls, lkid, &lkb);
5828 if (error)
5829 goto out;
5830
5831 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5832
5833 /* user can change the params on its lock when it converts it, or
5834 add an lvb that didn't exist before */
5835
5836 ua = lkb->lkb_ua;
5837
5838 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5839 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5840 if (!ua->lksb.sb_lvbptr) {
5841 error = -ENOMEM;
5842 goto out_put;
5843 }
5844 }
5845 if (lvb_in && ua->lksb.sb_lvbptr)
5846 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5847
5848 ua->xid = ua_tmp->xid;
5849 ua->castparam = ua_tmp->castparam;
5850 ua->castaddr = ua_tmp->castaddr;
5851 ua->bastparam = ua_tmp->bastparam;
5852 ua->bastaddr = ua_tmp->bastaddr;
5853 ua->user_lksb = ua_tmp->user_lksb;
5854
5855 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5856 fake_bastfn, &args);
5857 if (error)
5858 goto out_put;
5859
5860 error = convert_lock(ls, lkb, &args);
5861
5862 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5863 error = 0;
5864 out_put:
5865 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5866 dlm_put_lkb(lkb);
5867 out:
5868 dlm_unlock_recovery(ls);
5869 kfree(ua_tmp);
5870 return error;
5871 }
5872
5873 /*
5874 * The caller asks for an orphan lock on a given resource with a given mode.
5875 * If a matching lock exists, it's moved to the owner's list of locks and
5876 * the lkid is returned.
5877 */
5878
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5879 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5880 int mode, uint32_t flags, void *name, unsigned int namelen,
5881 uint32_t *lkid)
5882 {
5883 struct dlm_lkb *lkb = NULL, *iter;
5884 struct dlm_user_args *ua;
5885 int found_other_mode = 0;
5886 int rv = 0;
5887
5888 spin_lock_bh(&ls->ls_orphans_lock);
5889 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5890 if (iter->lkb_resource->res_length != namelen)
5891 continue;
5892 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5893 continue;
5894 if (iter->lkb_grmode != mode) {
5895 found_other_mode = 1;
5896 continue;
5897 }
5898
5899 lkb = iter;
5900 list_del_init(&iter->lkb_ownqueue);
5901 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5902 *lkid = iter->lkb_id;
5903 break;
5904 }
5905 spin_unlock_bh(&ls->ls_orphans_lock);
5906
5907 if (!lkb && found_other_mode) {
5908 rv = -EAGAIN;
5909 goto out;
5910 }
5911
5912 if (!lkb) {
5913 rv = -ENOENT;
5914 goto out;
5915 }
5916
5917 lkb->lkb_exflags = flags;
5918 lkb->lkb_ownpid = (int) current->pid;
5919
5920 ua = lkb->lkb_ua;
5921
5922 ua->proc = ua_tmp->proc;
5923 ua->xid = ua_tmp->xid;
5924 ua->castparam = ua_tmp->castparam;
5925 ua->castaddr = ua_tmp->castaddr;
5926 ua->bastparam = ua_tmp->bastparam;
5927 ua->bastaddr = ua_tmp->bastaddr;
5928 ua->user_lksb = ua_tmp->user_lksb;
5929
5930 /*
5931 * The lkb reference from the ls_orphans list was not
5932 * removed above, and is now considered the reference
5933 * for the proc locks list.
5934 */
5935
5936 spin_lock_bh(&ua->proc->locks_spin);
5937 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5938 spin_unlock_bh(&ua->proc->locks_spin);
5939 out:
5940 kfree(ua_tmp);
5941 return rv;
5942 }
5943
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5944 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5945 uint32_t flags, uint32_t lkid, char *lvb_in)
5946 {
5947 struct dlm_lkb *lkb;
5948 struct dlm_args args;
5949 struct dlm_user_args *ua;
5950 int error;
5951
5952 dlm_lock_recovery(ls);
5953
5954 error = find_lkb(ls, lkid, &lkb);
5955 if (error)
5956 goto out;
5957
5958 trace_dlm_unlock_start(ls, lkb, flags);
5959
5960 ua = lkb->lkb_ua;
5961
5962 if (lvb_in && ua->lksb.sb_lvbptr)
5963 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5964 if (ua_tmp->castparam)
5965 ua->castparam = ua_tmp->castparam;
5966 ua->user_lksb = ua_tmp->user_lksb;
5967
5968 error = set_unlock_args(flags, ua, &args);
5969 if (error)
5970 goto out_put;
5971
5972 error = unlock_lock(ls, lkb, &args);
5973
5974 if (error == -DLM_EUNLOCK)
5975 error = 0;
5976 /* from validate_unlock_args() */
5977 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5978 error = 0;
5979 if (error)
5980 goto out_put;
5981
5982 spin_lock_bh(&ua->proc->locks_spin);
5983 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5984 if (!list_empty(&lkb->lkb_ownqueue))
5985 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5986 spin_unlock_bh(&ua->proc->locks_spin);
5987 out_put:
5988 trace_dlm_unlock_end(ls, lkb, flags, error);
5989 dlm_put_lkb(lkb);
5990 out:
5991 dlm_unlock_recovery(ls);
5992 kfree(ua_tmp);
5993 return error;
5994 }
5995
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)5996 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5997 uint32_t flags, uint32_t lkid)
5998 {
5999 struct dlm_lkb *lkb;
6000 struct dlm_args args;
6001 struct dlm_user_args *ua;
6002 int error;
6003
6004 dlm_lock_recovery(ls);
6005
6006 error = find_lkb(ls, lkid, &lkb);
6007 if (error)
6008 goto out;
6009
6010 trace_dlm_unlock_start(ls, lkb, flags);
6011
6012 ua = lkb->lkb_ua;
6013 if (ua_tmp->castparam)
6014 ua->castparam = ua_tmp->castparam;
6015 ua->user_lksb = ua_tmp->user_lksb;
6016
6017 error = set_unlock_args(flags, ua, &args);
6018 if (error)
6019 goto out_put;
6020
6021 error = cancel_lock(ls, lkb, &args);
6022
6023 if (error == -DLM_ECANCEL)
6024 error = 0;
6025 /* from validate_unlock_args() */
6026 if (error == -EBUSY)
6027 error = 0;
6028 out_put:
6029 trace_dlm_unlock_end(ls, lkb, flags, error);
6030 dlm_put_lkb(lkb);
6031 out:
6032 dlm_unlock_recovery(ls);
6033 kfree(ua_tmp);
6034 return error;
6035 }
6036
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6037 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6038 {
6039 struct dlm_lkb *lkb;
6040 struct dlm_args args;
6041 struct dlm_user_args *ua;
6042 struct dlm_rsb *r;
6043 int error;
6044
6045 dlm_lock_recovery(ls);
6046
6047 error = find_lkb(ls, lkid, &lkb);
6048 if (error)
6049 goto out;
6050
6051 trace_dlm_unlock_start(ls, lkb, flags);
6052
6053 ua = lkb->lkb_ua;
6054
6055 error = set_unlock_args(flags, ua, &args);
6056 if (error)
6057 goto out_put;
6058
6059 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6060
6061 r = lkb->lkb_resource;
6062 hold_rsb(r);
6063 lock_rsb(r);
6064
6065 error = validate_unlock_args(lkb, &args);
6066 if (error)
6067 goto out_r;
6068 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6069
6070 error = _cancel_lock(r, lkb);
6071 out_r:
6072 unlock_rsb(r);
6073 put_rsb(r);
6074
6075 if (error == -DLM_ECANCEL)
6076 error = 0;
6077 /* from validate_unlock_args() */
6078 if (error == -EBUSY)
6079 error = 0;
6080 out_put:
6081 trace_dlm_unlock_end(ls, lkb, flags, error);
6082 dlm_put_lkb(lkb);
6083 out:
6084 dlm_unlock_recovery(ls);
6085 return error;
6086 }
6087
6088 /* lkb's that are removed from the waiters list by revert are just left on the
6089 orphans list with the granted orphan locks, to be freed by purge */
6090
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6091 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6092 {
6093 struct dlm_args args;
6094 int error;
6095
6096 hold_lkb(lkb); /* reference for the ls_orphans list */
6097 spin_lock_bh(&ls->ls_orphans_lock);
6098 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6099 spin_unlock_bh(&ls->ls_orphans_lock);
6100
6101 set_unlock_args(0, lkb->lkb_ua, &args);
6102
6103 error = cancel_lock(ls, lkb, &args);
6104 if (error == -DLM_ECANCEL)
6105 error = 0;
6106 return error;
6107 }
6108
6109 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6110 granted. Regardless of what rsb queue the lock is on, it's removed and
6111 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6112 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6113
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6114 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6115 {
6116 struct dlm_args args;
6117 int error;
6118
6119 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6120 lkb->lkb_ua, &args);
6121
6122 error = unlock_lock(ls, lkb, &args);
6123 if (error == -DLM_EUNLOCK)
6124 error = 0;
6125 return error;
6126 }
6127
6128 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6129 (which does lock_rsb) due to deadlock with receiving a message that does
6130 lock_rsb followed by dlm_user_add_cb() */
6131
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6132 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6133 struct dlm_user_proc *proc)
6134 {
6135 struct dlm_lkb *lkb = NULL;
6136
6137 spin_lock_bh(&ls->ls_clear_proc_locks);
6138 if (list_empty(&proc->locks))
6139 goto out;
6140
6141 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6142 list_del_init(&lkb->lkb_ownqueue);
6143
6144 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6145 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6146 else
6147 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6148 out:
6149 spin_unlock_bh(&ls->ls_clear_proc_locks);
6150 return lkb;
6151 }
6152
6153 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6154 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6155 which we clear here. */
6156
6157 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6158 list, and no more device_writes should add lkb's to proc->locks list; so we
6159 shouldn't need to take asts_spin or locks_spin here. this assumes that
6160 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6161 them ourself. */
6162
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6163 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6164 {
6165 struct dlm_callback *cb, *cb_safe;
6166 struct dlm_lkb *lkb, *safe;
6167
6168 dlm_lock_recovery(ls);
6169
6170 while (1) {
6171 lkb = del_proc_lock(ls, proc);
6172 if (!lkb)
6173 break;
6174 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6175 orphan_proc_lock(ls, lkb);
6176 else
6177 unlock_proc_lock(ls, lkb);
6178
6179 /* this removes the reference for the proc->locks list
6180 added by dlm_user_request, it may result in the lkb
6181 being freed */
6182
6183 dlm_put_lkb(lkb);
6184 }
6185
6186 spin_lock_bh(&ls->ls_clear_proc_locks);
6187
6188 /* in-progress unlocks */
6189 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6190 list_del_init(&lkb->lkb_ownqueue);
6191 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6192 dlm_put_lkb(lkb);
6193 }
6194
6195 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6196 list_del(&cb->list);
6197 dlm_free_cb(cb);
6198 }
6199
6200 spin_unlock_bh(&ls->ls_clear_proc_locks);
6201 dlm_unlock_recovery(ls);
6202 }
6203
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6204 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6205 {
6206 struct dlm_callback *cb, *cb_safe;
6207 struct dlm_lkb *lkb, *safe;
6208
6209 while (1) {
6210 lkb = NULL;
6211 spin_lock_bh(&proc->locks_spin);
6212 if (!list_empty(&proc->locks)) {
6213 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6214 lkb_ownqueue);
6215 list_del_init(&lkb->lkb_ownqueue);
6216 }
6217 spin_unlock_bh(&proc->locks_spin);
6218
6219 if (!lkb)
6220 break;
6221
6222 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6223 unlock_proc_lock(ls, lkb);
6224 dlm_put_lkb(lkb); /* ref from proc->locks list */
6225 }
6226
6227 spin_lock_bh(&proc->locks_spin);
6228 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6229 list_del_init(&lkb->lkb_ownqueue);
6230 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6231 dlm_put_lkb(lkb);
6232 }
6233 spin_unlock_bh(&proc->locks_spin);
6234
6235 spin_lock_bh(&proc->asts_spin);
6236 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6237 list_del(&cb->list);
6238 dlm_free_cb(cb);
6239 }
6240 spin_unlock_bh(&proc->asts_spin);
6241 }
6242
6243 /* pid of 0 means purge all orphans */
6244
do_purge(struct dlm_ls * ls,int nodeid,int pid)6245 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6246 {
6247 struct dlm_lkb *lkb, *safe;
6248
6249 spin_lock_bh(&ls->ls_orphans_lock);
6250 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6251 if (pid && lkb->lkb_ownpid != pid)
6252 continue;
6253 unlock_proc_lock(ls, lkb);
6254 list_del_init(&lkb->lkb_ownqueue);
6255 dlm_put_lkb(lkb);
6256 }
6257 spin_unlock_bh(&ls->ls_orphans_lock);
6258 }
6259
send_purge(struct dlm_ls * ls,int nodeid,int pid)6260 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6261 {
6262 struct dlm_message *ms;
6263 struct dlm_mhandle *mh;
6264 int error;
6265
6266 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6267 DLM_MSG_PURGE, &ms, &mh);
6268 if (error)
6269 return error;
6270 ms->m_nodeid = cpu_to_le32(nodeid);
6271 ms->m_pid = cpu_to_le32(pid);
6272
6273 return send_message(mh, ms, NULL, 0);
6274 }
6275
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6276 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6277 int nodeid, int pid)
6278 {
6279 int error = 0;
6280
6281 if (nodeid && (nodeid != dlm_our_nodeid())) {
6282 error = send_purge(ls, nodeid, pid);
6283 } else {
6284 dlm_lock_recovery(ls);
6285 if (pid == current->pid)
6286 purge_proc_locks(ls, proc);
6287 else
6288 do_purge(ls, nodeid, pid);
6289 dlm_unlock_recovery(ls);
6290 }
6291 return error;
6292 }
6293
6294 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6295 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6296 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6297 {
6298 struct dlm_lksb *lksb;
6299 struct dlm_lkb *lkb;
6300 struct dlm_rsb *r;
6301 int error;
6302
6303 /* we currently can't set a valid user lock */
6304 if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6305 return -EOPNOTSUPP;
6306
6307 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6308 if (!lksb)
6309 return -ENOMEM;
6310
6311 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6312 if (error) {
6313 kfree(lksb);
6314 return error;
6315 }
6316
6317 dlm_set_dflags_val(lkb, lkb_dflags);
6318 lkb->lkb_nodeid = lkb_nodeid;
6319 lkb->lkb_lksb = lksb;
6320 /* user specific pointer, just don't have it NULL for kernel locks */
6321 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6322 lkb->lkb_astparam = (void *)0xDEADBEEF;
6323
6324 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6325 if (error) {
6326 kfree(lksb);
6327 __put_lkb(ls, lkb);
6328 return error;
6329 }
6330
6331 lock_rsb(r);
6332 attach_lkb(r, lkb);
6333 add_lkb(r, lkb, lkb_status);
6334 unlock_rsb(r);
6335 put_rsb(r);
6336
6337 return 0;
6338 }
6339
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6340 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6341 int mstype, int to_nodeid)
6342 {
6343 struct dlm_lkb *lkb;
6344 int error;
6345
6346 error = find_lkb(ls, lkb_id, &lkb);
6347 if (error)
6348 return error;
6349
6350 add_to_waiters(lkb, mstype, to_nodeid);
6351 dlm_put_lkb(lkb);
6352 return 0;
6353 }
6354
6355