1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13 dlm_lock()
14 dlm_unlock()
15
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
20
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
25
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
30
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
33
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
38
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
41
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
47
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
51
52 L: send_xxxx() -> R: receive_xxxx()
53 R: do_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93
94 /*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 */
101
102 static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112 };
113
114 /*
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
121 */
122
123 const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
dlm_modes_compat(int mode1,int mode2)138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
147 */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159 };
160
dlm_print_lkb(struct dlm_lkb * lkb)161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
169 }
170
dlm_print_rsb(struct dlm_rsb * r)171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 "rlc %d name %s\n",
175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 r->res_name);
178 }
179
dlm_dump_rsb(struct dlm_rsb * r)180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 struct dlm_lkb *lkb;
183
184 dlm_print_rsb(r);
185
186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 printk(KERN_ERR "rsb lookup list\n");
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 dlm_print_lkb(lkb);
191 printk(KERN_ERR "rsb grant queue:\n");
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
194 printk(KERN_ERR "rsb convert queue:\n");
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
197 printk(KERN_ERR "rsb wait queue:\n");
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
dlm_lock_recovery(struct dlm_ls * ls)204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 down_read(&ls->ls_in_recovery);
207 }
208
dlm_unlock_recovery(struct dlm_ls * ls)209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 up_read(&ls->ls_in_recovery);
212 }
213
dlm_lock_recovery_try(struct dlm_ls * ls)214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 return down_read_trylock(&ls->ls_in_recovery);
217 }
218
can_be_queued(struct dlm_lkb * lkb)219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
force_blocking_asts(struct dlm_lkb * lkb)224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
is_demoted(struct dlm_lkb * lkb)229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
is_altmode(struct dlm_lkb * lkb)234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
is_granted(struct dlm_lkb * lkb)239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
is_remote(struct dlm_rsb * r)244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 return !!r->res_nodeid;
248 }
249
is_process_copy(struct dlm_lkb * lkb)250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 return 1;
266 return 0;
267 }
268
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 if (is_master_copy(lkb))
293 return;
294
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297 if (rv == -DLM_ECANCEL &&
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 rv = -EDEADLK;
300
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 queue_cast(r, lkb,
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
314 } else {
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 }
317 }
318
319 /*
320 * Basic operations on rsb's and lkb's
321 */
322
rsb_toss_jiffies(void)323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327
328 /* This is only called to add a reference when the code already holds
329 a valid reference to the rsb, so there's no need for locking. */
330
hold_rsb(struct dlm_rsb * r)331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 /* inactive rsbs are not ref counted */
334 WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 kref_get(&r->res_ref);
336 }
337
dlm_hold_rsb(struct dlm_rsb * r)338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 hold_rsb(r);
341 }
342
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(lock)
347 {
348 if (refcount_dec_not_one(r))
349 return false;
350
351 write_lock_bh(lock);
352 if (!refcount_dec_and_test(r)) {
353 write_unlock_bh(lock);
354 return false;
355 }
356
357 return true;
358 }
359
360 /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 void (*release)(struct kref *kref),
363 rwlock_t *lock)
364 {
365 if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 release(kref);
367 return 1;
368 }
369
370 return 0;
371 }
372
put_rsb(struct dlm_rsb * r)373 static void put_rsb(struct dlm_rsb *r)
374 {
375 struct dlm_ls *ls = r->res_ls;
376 int rv;
377
378 rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 &ls->ls_rsbtbl_lock);
380 if (rv)
381 write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383
dlm_put_rsb(struct dlm_rsb * r)384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 put_rsb(r);
387 }
388
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390 * new timers when recovery is triggered and don't run them
391 * again until a resume_scan_timer() tries it again.
392 */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 if (!dlm_locking_stopped(ls))
396 mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398
399 /* This function tries to resume the timer callback if a rsb
400 * is on the scan list and no timer is pending. It might that
401 * the first entry is on currently executed as timer callback
402 * but we don't care if a timer queued up again and does
403 * nothing. Should be a rare case.
404 */
resume_scan_timer(struct dlm_ls * ls)405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 struct dlm_rsb *r;
408
409 spin_lock_bh(&ls->ls_scan_lock);
410 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 res_scan_list);
412 if (r && !timer_pending(&ls->ls_scan_timer))
413 enable_scan_timer(ls, r->res_toss_time);
414 spin_unlock_bh(&ls->ls_scan_lock);
415 }
416
417 /* ls_rsbtbl_lock must be held */
418
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 struct dlm_rsb *first;
422
423 /* active rsbs should never be on the scan list */
424 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425
426 spin_lock_bh(&ls->ls_scan_lock);
427 r->res_toss_time = 0;
428
429 /* if the rsb is not queued do nothing */
430 if (list_empty(&r->res_scan_list))
431 goto out;
432
433 /* get the first element before delete */
434 first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 res_scan_list);
436 list_del_init(&r->res_scan_list);
437 /* check if the first element was the rsb we deleted */
438 if (first == r) {
439 /* try to get the new first element, if the list
440 * is empty now try to delete the timer, if we are
441 * too late we don't care.
442 *
443 * if the list isn't empty and a new first element got
444 * in place, set the new timer expire time.
445 */
446 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 res_scan_list);
448 if (!first)
449 timer_delete(&ls->ls_scan_timer);
450 else
451 enable_scan_timer(ls, first->res_toss_time);
452 }
453
454 out:
455 spin_unlock_bh(&ls->ls_scan_lock);
456 }
457
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 int our_nodeid = dlm_our_nodeid();
461 struct dlm_rsb *first;
462
463 /* A dir record for a remote master rsb should never be on the scan list. */
464 WARN_ON(!dlm_no_directory(ls) &&
465 (r->res_master_nodeid != our_nodeid) &&
466 (dlm_dir_nodeid(r) == our_nodeid));
467
468 /* An active rsb should never be on the scan list. */
469 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470
471 /* An rsb should not already be on the scan list. */
472 WARN_ON(!list_empty(&r->res_scan_list));
473
474 spin_lock_bh(&ls->ls_scan_lock);
475 /* set the new rsb absolute expire time in the rsb */
476 r->res_toss_time = rsb_toss_jiffies();
477 if (list_empty(&ls->ls_scan_list)) {
478 /* if the queue is empty add the element and it's
479 * our new expire time
480 */
481 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 enable_scan_timer(ls, r->res_toss_time);
483 } else {
484 /* try to get the maybe new first element and then add
485 * to this rsb with the oldest expire time to the end
486 * of the queue. If the list was empty before this
487 * rsb expire time is our next expiration if it wasn't
488 * the now new first elemet is our new expiration time
489 */
490 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 res_scan_list);
492 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 if (!first)
494 enable_scan_timer(ls, r->res_toss_time);
495 else
496 enable_scan_timer(ls, first->res_toss_time);
497 }
498 spin_unlock_bh(&ls->ls_scan_lock);
499 }
500
501 /* if we hit contention we do in 250 ms a retry to trylock.
502 * if there is any other mod_timer in between we don't care
503 * about that it expires earlier again this is only for the
504 * unlikely case nothing happened in this time.
505 */
506 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
507
508 /* Called by lockspace scan_timer to free unused rsb's. */
509
dlm_rsb_scan(struct timer_list * timer)510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 struct dlm_ls *ls = from_timer(ls, timer, ls_scan_timer);
513 int our_nodeid = dlm_our_nodeid();
514 struct dlm_rsb *r;
515 int rv;
516
517 while (1) {
518 /* interrupting point to leave iteration when
519 * recovery waits for timer_delete_sync(), recovery
520 * will take care to delete everything in scan list.
521 */
522 if (dlm_locking_stopped(ls))
523 break;
524
525 rv = spin_trylock(&ls->ls_scan_lock);
526 if (!rv) {
527 /* rearm again try timer */
528 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 break;
530 }
531
532 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 res_scan_list);
534 if (!r) {
535 /* the next add_scan will enable the timer again */
536 spin_unlock(&ls->ls_scan_lock);
537 break;
538 }
539
540 /*
541 * If the first rsb is not yet expired, then stop because the
542 * list is sorted with nearest expiration first.
543 */
544 if (time_before(jiffies, r->res_toss_time)) {
545 /* rearm with the next rsb to expire in the future */
546 enable_scan_timer(ls, r->res_toss_time);
547 spin_unlock(&ls->ls_scan_lock);
548 break;
549 }
550
551 /* in find_rsb_dir/nodir there is a reverse order of this
552 * lock, however this is only a trylock if we hit some
553 * possible contention we try it again.
554 */
555 rv = write_trylock(&ls->ls_rsbtbl_lock);
556 if (!rv) {
557 spin_unlock(&ls->ls_scan_lock);
558 /* rearm again try timer */
559 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 break;
561 }
562
563 list_del(&r->res_slow_list);
564 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 dlm_rhash_rsb_params);
566 rsb_clear_flag(r, RSB_HASHED);
567
568 /* ls_rsbtbl_lock is not needed when calling send_remove() */
569 write_unlock(&ls->ls_rsbtbl_lock);
570
571 list_del_init(&r->res_scan_list);
572 spin_unlock(&ls->ls_scan_lock);
573
574 /* An rsb that is a dir record for a remote master rsb
575 * cannot be removed, and should not have a timer enabled.
576 */
577 WARN_ON(!dlm_no_directory(ls) &&
578 (r->res_master_nodeid != our_nodeid) &&
579 (dlm_dir_nodeid(r) == our_nodeid));
580
581 /* We're the master of this rsb but we're not
582 * the directory record, so we need to tell the
583 * dir node to remove the dir record
584 */
585 if (!dlm_no_directory(ls) &&
586 (r->res_master_nodeid == our_nodeid) &&
587 (dlm_dir_nodeid(r) != our_nodeid))
588 send_remove(r);
589
590 free_inactive_rsb(r);
591 }
592 }
593
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595 unlock any spinlocks, go back and call pre_rsb_struct again.
596 Otherwise, take an rsb off the list and return it. */
597
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 struct dlm_rsb **r_ret)
600 {
601 struct dlm_rsb *r;
602
603 r = dlm_allocate_rsb();
604 if (!r)
605 return -ENOMEM;
606
607 r->res_ls = ls;
608 r->res_length = len;
609 memcpy(r->res_name, name, len);
610 spin_lock_init(&r->res_lock);
611
612 INIT_LIST_HEAD(&r->res_lookup);
613 INIT_LIST_HEAD(&r->res_grantqueue);
614 INIT_LIST_HEAD(&r->res_convertqueue);
615 INIT_LIST_HEAD(&r->res_waitqueue);
616 INIT_LIST_HEAD(&r->res_root_list);
617 INIT_LIST_HEAD(&r->res_scan_list);
618 INIT_LIST_HEAD(&r->res_recover_list);
619 INIT_LIST_HEAD(&r->res_masters_list);
620
621 *r_ret = r;
622 return 0;
623 }
624
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 struct dlm_rsb **r_ret)
627 {
628 char key[DLM_RESNAME_MAXLEN] = {};
629
630 memcpy(key, name, len);
631 *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
632 if (*r_ret)
633 return 0;
634
635 return -EBADR;
636 }
637
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)638 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
639 {
640 int rv;
641
642 rv = rhashtable_insert_fast(rhash, &rsb->res_node,
643 dlm_rhash_rsb_params);
644 if (!rv)
645 rsb_set_flag(rsb, RSB_HASHED);
646
647 return rv;
648 }
649
650 /*
651 * Find rsb in rsbtbl and potentially create/add one
652 *
653 * Delaying the release of rsb's has a similar benefit to applications keeping
654 * NL locks on an rsb, but without the guarantee that the cached master value
655 * will still be valid when the rsb is reused. Apps aren't always smart enough
656 * to keep NL locks on an rsb that they may lock again shortly; this can lead
657 * to excessive master lookups and removals if we don't delay the release.
658 *
659 * Searching for an rsb means looking through both the normal list and toss
660 * list. When found on the toss list the rsb is moved to the normal list with
661 * ref count of 1; when found on normal list the ref count is incremented.
662 *
663 * rsb's on the keep list are being used locally and refcounted.
664 * rsb's on the toss list are not being used locally, and are not refcounted.
665 *
666 * The toss list rsb's were either
667 * - previously used locally but not any more (were on keep list, then
668 * moved to toss list when last refcount dropped)
669 * - created and put on toss list as a directory record for a lookup
670 * (we are the dir node for the res, but are not using the res right now,
671 * but some other node is)
672 *
673 * The purpose of find_rsb() is to return a refcounted rsb for local use.
674 * So, if the given rsb is on the toss list, it is moved to the keep list
675 * before being returned.
676 *
677 * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
678 * more refcounts exist, so the rsb is moved from the keep list to the
679 * toss list.
680 *
681 * rsb's on both keep and toss lists are used for doing a name to master
682 * lookups. rsb's that are in use locally (and being refcounted) are on
683 * the keep list, rsb's that are not in use locally (not refcounted) and
684 * only exist for name/master lookups are on the toss list.
685 *
686 * rsb's on the toss list who's dir_nodeid is not local can have stale
687 * name/master mappings. So, remote requests on such rsb's can potentially
688 * return with an error, which means the mapping is stale and needs to
689 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
690 * first_lkid is to keep only a single outstanding request on an rsb
691 * while that rsb has a potentially stale master.)
692 */
693
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)694 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
695 uint32_t hash, int dir_nodeid, int from_nodeid,
696 unsigned int flags, struct dlm_rsb **r_ret)
697 {
698 struct dlm_rsb *r = NULL;
699 int our_nodeid = dlm_our_nodeid();
700 int from_local = 0;
701 int from_other = 0;
702 int from_dir = 0;
703 int create = 0;
704 int error;
705
706 if (flags & R_RECEIVE_REQUEST) {
707 if (from_nodeid == dir_nodeid)
708 from_dir = 1;
709 else
710 from_other = 1;
711 } else if (flags & R_REQUEST) {
712 from_local = 1;
713 }
714
715 /*
716 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
717 * from_nodeid has sent us a lock in dlm_recover_locks, believing
718 * we're the new master. Our local recovery may not have set
719 * res_master_nodeid to our_nodeid yet, so allow either. Don't
720 * create the rsb; dlm_recover_process_copy() will handle EBADR
721 * by resending.
722 *
723 * If someone sends us a request, we are the dir node, and we do
724 * not find the rsb anywhere, then recreate it. This happens if
725 * someone sends us a request after we have removed/freed an rsb.
726 * (They sent a request instead of lookup because they are using
727 * an rsb taken from their scan list.)
728 */
729
730 if (from_local || from_dir ||
731 (from_other && (dir_nodeid == our_nodeid))) {
732 create = 1;
733 }
734
735 retry:
736 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
737 if (error)
738 goto do_new;
739
740 /* check if the rsb is active under read lock - likely path */
741 read_lock_bh(&ls->ls_rsbtbl_lock);
742 if (!rsb_flag(r, RSB_HASHED)) {
743 read_unlock_bh(&ls->ls_rsbtbl_lock);
744 goto do_new;
745 }
746
747 /*
748 * rsb is active, so we can't check master_nodeid without lock_rsb.
749 */
750
751 if (rsb_flag(r, RSB_INACTIVE)) {
752 read_unlock_bh(&ls->ls_rsbtbl_lock);
753 goto do_inactive;
754 }
755
756 kref_get(&r->res_ref);
757 read_unlock_bh(&ls->ls_rsbtbl_lock);
758 goto out;
759
760
761 do_inactive:
762 write_lock_bh(&ls->ls_rsbtbl_lock);
763
764 /*
765 * The expectation here is that the rsb will have HASHED and
766 * INACTIVE flags set, and that the rsb can be moved from
767 * inactive back to active again. However, between releasing
768 * the read lock and acquiring the write lock, this rsb could
769 * have been removed from rsbtbl, and had HASHED cleared, to
770 * be freed. To deal with this case, we would normally need
771 * to repeat dlm_search_rsb_tree while holding the write lock,
772 * but rcu allows us to simply check the HASHED flag, because
773 * the rcu read lock means the rsb will not be freed yet.
774 * If the HASHED flag is not set, then the rsb is being freed,
775 * so we add a new rsb struct. If the HASHED flag is set,
776 * and INACTIVE is not set, it means another thread has
777 * made the rsb active, as we're expecting to do here, and
778 * we just repeat the lookup (this will be very unlikely.)
779 */
780 if (rsb_flag(r, RSB_HASHED)) {
781 if (!rsb_flag(r, RSB_INACTIVE)) {
782 write_unlock_bh(&ls->ls_rsbtbl_lock);
783 goto retry;
784 }
785 } else {
786 write_unlock_bh(&ls->ls_rsbtbl_lock);
787 goto do_new;
788 }
789
790 /*
791 * rsb found inactive (master_nodeid may be out of date unless
792 * we are the dir_nodeid or were the master) No other thread
793 * is using this rsb because it's inactive, so we can
794 * look at or update res_master_nodeid without lock_rsb.
795 */
796
797 if ((r->res_master_nodeid != our_nodeid) && from_other) {
798 /* our rsb was not master, and another node (not the dir node)
799 has sent us a request */
800 log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
801 from_nodeid, r->res_master_nodeid, dir_nodeid,
802 r->res_name);
803 write_unlock_bh(&ls->ls_rsbtbl_lock);
804 error = -ENOTBLK;
805 goto out;
806 }
807
808 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
809 /* don't think this should ever happen */
810 log_error(ls, "find_rsb inactive from_dir %d master %d",
811 from_nodeid, r->res_master_nodeid);
812 dlm_print_rsb(r);
813 /* fix it and go on */
814 r->res_master_nodeid = our_nodeid;
815 r->res_nodeid = 0;
816 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
817 r->res_first_lkid = 0;
818 }
819
820 if (from_local && (r->res_master_nodeid != our_nodeid)) {
821 /* Because we have held no locks on this rsb,
822 res_master_nodeid could have become stale. */
823 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
824 r->res_first_lkid = 0;
825 }
826
827 /* A dir record will not be on the scan list. */
828 if (r->res_dir_nodeid != our_nodeid)
829 del_scan(ls, r);
830 list_move(&r->res_slow_list, &ls->ls_slow_active);
831 rsb_clear_flag(r, RSB_INACTIVE);
832 kref_init(&r->res_ref); /* ref is now used in active state */
833 write_unlock_bh(&ls->ls_rsbtbl_lock);
834
835 goto out;
836
837
838 do_new:
839 /*
840 * rsb not found
841 */
842
843 if (error == -EBADR && !create)
844 goto out;
845
846 error = get_rsb_struct(ls, name, len, &r);
847 if (WARN_ON_ONCE(error))
848 goto out;
849
850 r->res_hash = hash;
851 r->res_dir_nodeid = dir_nodeid;
852 kref_init(&r->res_ref);
853
854 if (from_dir) {
855 /* want to see how often this happens */
856 log_debug(ls, "find_rsb new from_dir %d recreate %s",
857 from_nodeid, r->res_name);
858 r->res_master_nodeid = our_nodeid;
859 r->res_nodeid = 0;
860 goto out_add;
861 }
862
863 if (from_other && (dir_nodeid != our_nodeid)) {
864 /* should never happen */
865 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
866 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
867 dlm_free_rsb(r);
868 r = NULL;
869 error = -ENOTBLK;
870 goto out;
871 }
872
873 if (from_other) {
874 log_debug(ls, "find_rsb new from_other %d dir %d %s",
875 from_nodeid, dir_nodeid, r->res_name);
876 }
877
878 if (dir_nodeid == our_nodeid) {
879 /* When we are the dir nodeid, we can set the master
880 node immediately */
881 r->res_master_nodeid = our_nodeid;
882 r->res_nodeid = 0;
883 } else {
884 /* set_master will send_lookup to dir_nodeid */
885 r->res_master_nodeid = 0;
886 r->res_nodeid = -1;
887 }
888
889 out_add:
890
891 write_lock_bh(&ls->ls_rsbtbl_lock);
892 error = rsb_insert(r, &ls->ls_rsbtbl);
893 if (error == -EEXIST) {
894 /* somebody else was faster and it seems the
895 * rsb exists now, we do a whole relookup
896 */
897 write_unlock_bh(&ls->ls_rsbtbl_lock);
898 dlm_free_rsb(r);
899 goto retry;
900 } else if (!error) {
901 list_add(&r->res_slow_list, &ls->ls_slow_active);
902 }
903 write_unlock_bh(&ls->ls_rsbtbl_lock);
904 out:
905 *r_ret = r;
906 return error;
907 }
908
909 /* During recovery, other nodes can send us new MSTCPY locks (from
910 dlm_recover_locks) before we've made ourself master (in
911 dlm_recover_masters). */
912
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)913 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
914 uint32_t hash, int dir_nodeid, int from_nodeid,
915 unsigned int flags, struct dlm_rsb **r_ret)
916 {
917 struct dlm_rsb *r = NULL;
918 int our_nodeid = dlm_our_nodeid();
919 int recover = (flags & R_RECEIVE_RECOVER);
920 int error;
921
922 retry:
923 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
924 if (error)
925 goto do_new;
926
927 /* check if the rsb is in active state under read lock - likely path */
928 read_lock_bh(&ls->ls_rsbtbl_lock);
929 if (!rsb_flag(r, RSB_HASHED)) {
930 read_unlock_bh(&ls->ls_rsbtbl_lock);
931 goto do_new;
932 }
933
934 if (rsb_flag(r, RSB_INACTIVE)) {
935 read_unlock_bh(&ls->ls_rsbtbl_lock);
936 goto do_inactive;
937 }
938
939 /*
940 * rsb is active, so we can't check master_nodeid without lock_rsb.
941 */
942
943 kref_get(&r->res_ref);
944 read_unlock_bh(&ls->ls_rsbtbl_lock);
945
946 goto out;
947
948
949 do_inactive:
950 write_lock_bh(&ls->ls_rsbtbl_lock);
951
952 /* See comment in find_rsb_dir. */
953 if (rsb_flag(r, RSB_HASHED)) {
954 if (!rsb_flag(r, RSB_INACTIVE)) {
955 write_unlock_bh(&ls->ls_rsbtbl_lock);
956 goto retry;
957 }
958 } else {
959 write_unlock_bh(&ls->ls_rsbtbl_lock);
960 goto do_new;
961 }
962
963
964 /*
965 * rsb found inactive. No other thread is using this rsb because
966 * it's inactive, so we can look at or update res_master_nodeid
967 * without lock_rsb.
968 */
969
970 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
971 /* our rsb is not master, and another node has sent us a
972 request; this should never happen */
973 log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
974 from_nodeid, r->res_master_nodeid, dir_nodeid);
975 dlm_print_rsb(r);
976 write_unlock_bh(&ls->ls_rsbtbl_lock);
977 error = -ENOTBLK;
978 goto out;
979 }
980
981 if (!recover && (r->res_master_nodeid != our_nodeid) &&
982 (dir_nodeid == our_nodeid)) {
983 /* our rsb is not master, and we are dir; may as well fix it;
984 this should never happen */
985 log_error(ls, "find_rsb inactive our %d master %d dir %d",
986 our_nodeid, r->res_master_nodeid, dir_nodeid);
987 dlm_print_rsb(r);
988 r->res_master_nodeid = our_nodeid;
989 r->res_nodeid = 0;
990 }
991
992 list_move(&r->res_slow_list, &ls->ls_slow_active);
993 rsb_clear_flag(r, RSB_INACTIVE);
994 kref_init(&r->res_ref);
995 del_scan(ls, r);
996 write_unlock_bh(&ls->ls_rsbtbl_lock);
997
998 goto out;
999
1000
1001 do_new:
1002 /*
1003 * rsb not found
1004 */
1005
1006 error = get_rsb_struct(ls, name, len, &r);
1007 if (WARN_ON_ONCE(error))
1008 goto out;
1009
1010 r->res_hash = hash;
1011 r->res_dir_nodeid = dir_nodeid;
1012 r->res_master_nodeid = dir_nodeid;
1013 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1014 kref_init(&r->res_ref);
1015
1016 write_lock_bh(&ls->ls_rsbtbl_lock);
1017 error = rsb_insert(r, &ls->ls_rsbtbl);
1018 if (error == -EEXIST) {
1019 /* somebody else was faster and it seems the
1020 * rsb exists now, we do a whole relookup
1021 */
1022 write_unlock_bh(&ls->ls_rsbtbl_lock);
1023 dlm_free_rsb(r);
1024 goto retry;
1025 } else if (!error) {
1026 list_add(&r->res_slow_list, &ls->ls_slow_active);
1027 }
1028 write_unlock_bh(&ls->ls_rsbtbl_lock);
1029
1030 out:
1031 *r_ret = r;
1032 return error;
1033 }
1034
1035 /*
1036 * rsb rcu usage
1037 *
1038 * While rcu read lock is held, the rsb cannot be freed,
1039 * which allows a lookup optimization.
1040 *
1041 * Two threads are accessing the same rsb concurrently,
1042 * the first (A) is trying to use the rsb, the second (B)
1043 * is trying to free the rsb.
1044 *
1045 * thread A thread B
1046 * (trying to use rsb) (trying to free rsb)
1047 *
1048 * A1. rcu read lock
1049 * A2. rsbtbl read lock
1050 * A3. look up rsb in rsbtbl
1051 * A4. rsbtbl read unlock
1052 * B1. rsbtbl write lock
1053 * B2. look up rsb in rsbtbl
1054 * B3. remove rsb from rsbtbl
1055 * B4. clear rsb HASHED flag
1056 * B5. rsbtbl write unlock
1057 * B6. begin freeing rsb using rcu...
1058 *
1059 * (rsb is inactive, so try to make it active again)
1060 * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1061 * A6. the rsb HASHED flag is not set, which it means the rsb
1062 * is being removed from rsbtbl and freed, so don't use it.
1063 * A7. rcu read unlock
1064 *
1065 * B7. ...finish freeing rsb using rcu
1066 * A8. create a new rsb
1067 *
1068 * Without the rcu optimization, steps A5-8 would need to do
1069 * an extra rsbtbl lookup:
1070 * A5. rsbtbl write lock
1071 * A6. look up rsb in rsbtbl, not found
1072 * A7. rsbtbl write unlock
1073 * A8. create a new rsb
1074 */
1075
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1076 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1077 int from_nodeid, unsigned int flags,
1078 struct dlm_rsb **r_ret)
1079 {
1080 int dir_nodeid;
1081 uint32_t hash;
1082 int rv;
1083
1084 if (len > DLM_RESNAME_MAXLEN)
1085 return -EINVAL;
1086
1087 hash = jhash(name, len, 0);
1088 dir_nodeid = dlm_hash2nodeid(ls, hash);
1089
1090 rcu_read_lock();
1091 if (dlm_no_directory(ls))
1092 rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1093 from_nodeid, flags, r_ret);
1094 else
1095 rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1096 from_nodeid, flags, r_ret);
1097 rcu_read_unlock();
1098 return rv;
1099 }
1100
1101 /* we have received a request and found that res_master_nodeid != our_nodeid,
1102 so we need to return an error or make ourself the master */
1103
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1104 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1105 int from_nodeid)
1106 {
1107 if (dlm_no_directory(ls)) {
1108 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1109 from_nodeid, r->res_master_nodeid,
1110 r->res_dir_nodeid);
1111 dlm_print_rsb(r);
1112 return -ENOTBLK;
1113 }
1114
1115 if (from_nodeid != r->res_dir_nodeid) {
1116 /* our rsb is not master, and another node (not the dir node)
1117 has sent us a request. this is much more common when our
1118 master_nodeid is zero, so limit debug to non-zero. */
1119
1120 if (r->res_master_nodeid) {
1121 log_debug(ls, "validate master from_other %d master %d "
1122 "dir %d first %x %s", from_nodeid,
1123 r->res_master_nodeid, r->res_dir_nodeid,
1124 r->res_first_lkid, r->res_name);
1125 }
1126 return -ENOTBLK;
1127 } else {
1128 /* our rsb is not master, but the dir nodeid has sent us a
1129 request; this could happen with master 0 / res_nodeid -1 */
1130
1131 if (r->res_master_nodeid) {
1132 log_error(ls, "validate master from_dir %d master %d "
1133 "first %x %s",
1134 from_nodeid, r->res_master_nodeid,
1135 r->res_first_lkid, r->res_name);
1136 }
1137
1138 r->res_master_nodeid = dlm_our_nodeid();
1139 r->res_nodeid = 0;
1140 return 0;
1141 }
1142 }
1143
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1144 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1145 int from_nodeid, bool is_inactive, unsigned int flags,
1146 int *r_nodeid, int *result)
1147 {
1148 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1149 int from_master = (flags & DLM_LU_RECOVER_DIR);
1150
1151 if (r->res_dir_nodeid != our_nodeid) {
1152 /* should not happen, but may as well fix it and carry on */
1153 log_error(ls, "%s res_dir %d our %d %s", __func__,
1154 r->res_dir_nodeid, our_nodeid, r->res_name);
1155 r->res_dir_nodeid = our_nodeid;
1156 }
1157
1158 if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1159 /* Recovery uses this function to set a new master when
1160 * the previous master failed. Setting NEW_MASTER will
1161 * force dlm_recover_masters to call recover_master on this
1162 * rsb even though the res_nodeid is no longer removed.
1163 */
1164
1165 r->res_master_nodeid = from_nodeid;
1166 r->res_nodeid = from_nodeid;
1167 rsb_set_flag(r, RSB_NEW_MASTER);
1168
1169 if (is_inactive) {
1170 /* I don't think we should ever find it inactive. */
1171 log_error(ls, "%s fix_master inactive", __func__);
1172 dlm_dump_rsb(r);
1173 }
1174 }
1175
1176 if (from_master && (r->res_master_nodeid != from_nodeid)) {
1177 /* this will happen if from_nodeid became master during
1178 * a previous recovery cycle, and we aborted the previous
1179 * cycle before recovering this master value
1180 */
1181
1182 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1183 __func__, from_nodeid, r->res_master_nodeid,
1184 r->res_nodeid, r->res_first_lkid, r->res_name);
1185
1186 if (r->res_master_nodeid == our_nodeid) {
1187 log_error(ls, "from_master %d our_master", from_nodeid);
1188 dlm_dump_rsb(r);
1189 goto ret_assign;
1190 }
1191
1192 r->res_master_nodeid = from_nodeid;
1193 r->res_nodeid = from_nodeid;
1194 rsb_set_flag(r, RSB_NEW_MASTER);
1195 }
1196
1197 if (!r->res_master_nodeid) {
1198 /* this will happen if recovery happens while we're looking
1199 * up the master for this rsb
1200 */
1201
1202 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1203 from_nodeid, r->res_first_lkid, r->res_name);
1204 r->res_master_nodeid = from_nodeid;
1205 r->res_nodeid = from_nodeid;
1206 }
1207
1208 if (!from_master && !fix_master &&
1209 (r->res_master_nodeid == from_nodeid)) {
1210 /* this can happen when the master sends remove, the dir node
1211 * finds the rsb on the active list and ignores the remove,
1212 * and the former master sends a lookup
1213 */
1214
1215 log_limit(ls, "%s from master %d flags %x first %x %s",
1216 __func__, from_nodeid, flags, r->res_first_lkid,
1217 r->res_name);
1218 }
1219
1220 ret_assign:
1221 *r_nodeid = r->res_master_nodeid;
1222 if (result)
1223 *result = DLM_LU_MATCH;
1224 }
1225
1226 /*
1227 * We're the dir node for this res and another node wants to know the
1228 * master nodeid. During normal operation (non recovery) this is only
1229 * called from receive_lookup(); master lookups when the local node is
1230 * the dir node are done by find_rsb().
1231 *
1232 * normal operation, we are the dir node for a resource
1233 * . _request_lock
1234 * . set_master
1235 * . send_lookup
1236 * . receive_lookup
1237 * . dlm_master_lookup flags 0
1238 *
1239 * recover directory, we are rebuilding dir for all resources
1240 * . dlm_recover_directory
1241 * . dlm_rcom_names
1242 * remote node sends back the rsb names it is master of and we are dir of
1243 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1244 * we either create new rsb setting remote node as master, or find existing
1245 * rsb and set master to be the remote node.
1246 *
1247 * recover masters, we are finding the new master for resources
1248 * . dlm_recover_masters
1249 * . recover_master
1250 * . dlm_send_rcom_lookup
1251 * . receive_rcom_lookup
1252 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1253 */
1254
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1255 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1256 int len, unsigned int flags, int *r_nodeid, int *result)
1257 {
1258 struct dlm_rsb *r = NULL;
1259 uint32_t hash;
1260 int our_nodeid = dlm_our_nodeid();
1261 int dir_nodeid, error;
1262
1263 if (len > DLM_RESNAME_MAXLEN)
1264 return -EINVAL;
1265
1266 if (from_nodeid == our_nodeid) {
1267 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1268 our_nodeid, flags);
1269 return -EINVAL;
1270 }
1271
1272 hash = jhash(name, len, 0);
1273 dir_nodeid = dlm_hash2nodeid(ls, hash);
1274 if (dir_nodeid != our_nodeid) {
1275 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1276 from_nodeid, dir_nodeid, our_nodeid, hash,
1277 ls->ls_num_nodes);
1278 *r_nodeid = -1;
1279 return -EINVAL;
1280 }
1281
1282 retry:
1283 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1284 if (error)
1285 goto not_found;
1286
1287 /* check if the rsb is active under read lock - likely path */
1288 read_lock_bh(&ls->ls_rsbtbl_lock);
1289 if (!rsb_flag(r, RSB_HASHED)) {
1290 read_unlock_bh(&ls->ls_rsbtbl_lock);
1291 goto not_found;
1292 }
1293
1294 if (rsb_flag(r, RSB_INACTIVE)) {
1295 read_unlock_bh(&ls->ls_rsbtbl_lock);
1296 goto do_inactive;
1297 }
1298
1299 /* because the rsb is active, we need to lock_rsb before
1300 * checking/changing re_master_nodeid
1301 */
1302
1303 hold_rsb(r);
1304 read_unlock_bh(&ls->ls_rsbtbl_lock);
1305 lock_rsb(r);
1306
1307 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1308 flags, r_nodeid, result);
1309
1310 /* the rsb was active */
1311 unlock_rsb(r);
1312 put_rsb(r);
1313
1314 return 0;
1315
1316 do_inactive:
1317 /* unlikely path - check if still part of ls_rsbtbl */
1318 write_lock_bh(&ls->ls_rsbtbl_lock);
1319
1320 /* see comment in find_rsb_dir */
1321 if (rsb_flag(r, RSB_HASHED)) {
1322 if (!rsb_flag(r, RSB_INACTIVE)) {
1323 write_unlock_bh(&ls->ls_rsbtbl_lock);
1324 /* something as changed, very unlikely but
1325 * try again
1326 */
1327 goto retry;
1328 }
1329 } else {
1330 write_unlock_bh(&ls->ls_rsbtbl_lock);
1331 goto not_found;
1332 }
1333
1334 /* because the rsb is inactive, it's not refcounted and lock_rsb
1335 is not used, but is protected by the rsbtbl lock */
1336
1337 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1338 r_nodeid, result);
1339
1340 /* A dir record rsb should never be on scan list. */
1341 /* Try to fix this with del_scan? */
1342 WARN_ON(!list_empty(&r->res_scan_list));
1343
1344 write_unlock_bh(&ls->ls_rsbtbl_lock);
1345
1346 return 0;
1347
1348 not_found:
1349 error = get_rsb_struct(ls, name, len, &r);
1350 if (WARN_ON_ONCE(error))
1351 goto out;
1352
1353 r->res_hash = hash;
1354 r->res_dir_nodeid = our_nodeid;
1355 r->res_master_nodeid = from_nodeid;
1356 r->res_nodeid = from_nodeid;
1357 rsb_set_flag(r, RSB_INACTIVE);
1358
1359 write_lock_bh(&ls->ls_rsbtbl_lock);
1360 error = rsb_insert(r, &ls->ls_rsbtbl);
1361 if (error == -EEXIST) {
1362 /* somebody else was faster and it seems the
1363 * rsb exists now, we do a whole relookup
1364 */
1365 write_unlock_bh(&ls->ls_rsbtbl_lock);
1366 dlm_free_rsb(r);
1367 goto retry;
1368 } else if (error) {
1369 write_unlock_bh(&ls->ls_rsbtbl_lock);
1370 /* should never happen */
1371 dlm_free_rsb(r);
1372 goto retry;
1373 }
1374
1375 list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1376 write_unlock_bh(&ls->ls_rsbtbl_lock);
1377
1378 if (result)
1379 *result = DLM_LU_ADD;
1380 *r_nodeid = from_nodeid;
1381 out:
1382 return error;
1383 }
1384
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1385 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1386 int len, unsigned int flags, int *r_nodeid, int *result)
1387 {
1388 int rv;
1389 rcu_read_lock();
1390 rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1391 rcu_read_unlock();
1392 return rv;
1393 }
1394
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1395 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1396 {
1397 struct dlm_rsb *r;
1398
1399 read_lock_bh(&ls->ls_rsbtbl_lock);
1400 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1401 if (r->res_hash == hash)
1402 dlm_dump_rsb(r);
1403 }
1404 read_unlock_bh(&ls->ls_rsbtbl_lock);
1405 }
1406
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1407 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1408 {
1409 struct dlm_rsb *r = NULL;
1410 int error;
1411
1412 rcu_read_lock();
1413 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1414 if (!error)
1415 goto out;
1416
1417 dlm_dump_rsb(r);
1418 out:
1419 rcu_read_unlock();
1420 }
1421
deactivate_rsb(struct kref * kref)1422 static void deactivate_rsb(struct kref *kref)
1423 {
1424 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1425 struct dlm_ls *ls = r->res_ls;
1426 int our_nodeid = dlm_our_nodeid();
1427
1428 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1429 rsb_set_flag(r, RSB_INACTIVE);
1430 list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1431
1432 /*
1433 * When the rsb becomes unused:
1434 * - If it's not a dir record for a remote master rsb,
1435 * then it is put on the scan list to be freed.
1436 * - If it's a dir record for a remote master rsb,
1437 * then it is kept in the inactive state until
1438 * receive_remove() from the master node.
1439 */
1440 if (!dlm_no_directory(ls) &&
1441 (r->res_master_nodeid != our_nodeid) &&
1442 (dlm_dir_nodeid(r) != our_nodeid))
1443 add_scan(ls, r);
1444
1445 if (r->res_lvbptr) {
1446 dlm_free_lvb(r->res_lvbptr);
1447 r->res_lvbptr = NULL;
1448 }
1449 }
1450
free_inactive_rsb(struct dlm_rsb * r)1451 void free_inactive_rsb(struct dlm_rsb *r)
1452 {
1453 WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1454
1455 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1456 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1457 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1458 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1459 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1460 DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1461 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1462 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1463
1464 dlm_free_rsb(r);
1465 }
1466
1467 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1468 The rsb must exist as long as any lkb's for it do. */
1469
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1470 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472 hold_rsb(r);
1473 lkb->lkb_resource = r;
1474 }
1475
detach_lkb(struct dlm_lkb * lkb)1476 static void detach_lkb(struct dlm_lkb *lkb)
1477 {
1478 if (lkb->lkb_resource) {
1479 put_rsb(lkb->lkb_resource);
1480 lkb->lkb_resource = NULL;
1481 }
1482 }
1483
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1484 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1485 unsigned long start, unsigned long end)
1486 {
1487 struct xa_limit limit;
1488 struct dlm_lkb *lkb;
1489 int rv;
1490
1491 limit.max = end;
1492 limit.min = start;
1493
1494 lkb = dlm_allocate_lkb();
1495 if (!lkb)
1496 return -ENOMEM;
1497
1498 lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1499 lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1500 lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1501 lkb->lkb_nodeid = -1;
1502 lkb->lkb_grmode = DLM_LOCK_IV;
1503 kref_init(&lkb->lkb_ref);
1504 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1505 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1506
1507 write_lock_bh(&ls->ls_lkbxa_lock);
1508 rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1509 write_unlock_bh(&ls->ls_lkbxa_lock);
1510
1511 if (rv < 0) {
1512 log_error(ls, "create_lkb xa error %d", rv);
1513 dlm_free_lkb(lkb);
1514 return rv;
1515 }
1516
1517 *lkb_ret = lkb;
1518 return 0;
1519 }
1520
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1521 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1522 {
1523 return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1524 }
1525
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1526 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1527 {
1528 struct dlm_lkb *lkb;
1529
1530 rcu_read_lock();
1531 lkb = xa_load(&ls->ls_lkbxa, lkid);
1532 if (lkb) {
1533 /* check if lkb is still part of lkbxa under lkbxa_lock as
1534 * the lkb_ref is tight to the lkbxa data structure, see
1535 * __put_lkb().
1536 */
1537 read_lock_bh(&ls->ls_lkbxa_lock);
1538 if (kref_read(&lkb->lkb_ref))
1539 kref_get(&lkb->lkb_ref);
1540 else
1541 lkb = NULL;
1542 read_unlock_bh(&ls->ls_lkbxa_lock);
1543 }
1544 rcu_read_unlock();
1545
1546 *lkb_ret = lkb;
1547 return lkb ? 0 : -ENOENT;
1548 }
1549
kill_lkb(struct kref * kref)1550 static void kill_lkb(struct kref *kref)
1551 {
1552 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1553
1554 /* All work is done after the return from kref_put() so we
1555 can release the write_lock before the detach_lkb */
1556
1557 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1558 }
1559
1560 /* __put_lkb() is used when an lkb may not have an rsb attached to
1561 it so we need to provide the lockspace explicitly */
1562
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1563 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1564 {
1565 uint32_t lkid = lkb->lkb_id;
1566 int rv;
1567
1568 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1569 &ls->ls_lkbxa_lock);
1570 if (rv) {
1571 xa_erase(&ls->ls_lkbxa, lkid);
1572 write_unlock_bh(&ls->ls_lkbxa_lock);
1573
1574 detach_lkb(lkb);
1575
1576 /* for local/process lkbs, lvbptr points to caller's lksb */
1577 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1578 dlm_free_lvb(lkb->lkb_lvbptr);
1579 dlm_free_lkb(lkb);
1580 }
1581
1582 return rv;
1583 }
1584
dlm_put_lkb(struct dlm_lkb * lkb)1585 int dlm_put_lkb(struct dlm_lkb *lkb)
1586 {
1587 struct dlm_ls *ls;
1588
1589 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1590 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1591
1592 ls = lkb->lkb_resource->res_ls;
1593 return __put_lkb(ls, lkb);
1594 }
1595
1596 /* This is only called to add a reference when the code already holds
1597 a valid reference to the lkb, so there's no need for locking. */
1598
hold_lkb(struct dlm_lkb * lkb)1599 static inline void hold_lkb(struct dlm_lkb *lkb)
1600 {
1601 kref_get(&lkb->lkb_ref);
1602 }
1603
unhold_lkb_assert(struct kref * kref)1604 static void unhold_lkb_assert(struct kref *kref)
1605 {
1606 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1607
1608 DLM_ASSERT(false, dlm_print_lkb(lkb););
1609 }
1610
1611 /* This is called when we need to remove a reference and are certain
1612 it's not the last ref. e.g. del_lkb is always called between a
1613 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1614 put_lkb would work fine, but would involve unnecessary locking */
1615
unhold_lkb(struct dlm_lkb * lkb)1616 static inline void unhold_lkb(struct dlm_lkb *lkb)
1617 {
1618 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1619 }
1620
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1621 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1622 int mode)
1623 {
1624 struct dlm_lkb *lkb = NULL, *iter;
1625
1626 list_for_each_entry(iter, head, lkb_statequeue)
1627 if (iter->lkb_rqmode < mode) {
1628 lkb = iter;
1629 list_add_tail(new, &iter->lkb_statequeue);
1630 break;
1631 }
1632
1633 if (!lkb)
1634 list_add_tail(new, head);
1635 }
1636
1637 /* add/remove lkb to rsb's grant/convert/wait queue */
1638
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1639 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1640 {
1641 kref_get(&lkb->lkb_ref);
1642
1643 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1644
1645 lkb->lkb_timestamp = ktime_get();
1646
1647 lkb->lkb_status = status;
1648
1649 switch (status) {
1650 case DLM_LKSTS_WAITING:
1651 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1652 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1653 else
1654 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1655 break;
1656 case DLM_LKSTS_GRANTED:
1657 /* convention says granted locks kept in order of grmode */
1658 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1659 lkb->lkb_grmode);
1660 break;
1661 case DLM_LKSTS_CONVERT:
1662 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1663 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1664 else
1665 list_add_tail(&lkb->lkb_statequeue,
1666 &r->res_convertqueue);
1667 break;
1668 default:
1669 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1670 }
1671 }
1672
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1673 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1674 {
1675 lkb->lkb_status = 0;
1676 list_del(&lkb->lkb_statequeue);
1677 unhold_lkb(lkb);
1678 }
1679
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1680 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1681 {
1682 del_lkb(r, lkb);
1683 add_lkb(r, lkb, sts);
1684 }
1685
msg_reply_type(int mstype)1686 static int msg_reply_type(int mstype)
1687 {
1688 switch (mstype) {
1689 case DLM_MSG_REQUEST:
1690 return DLM_MSG_REQUEST_REPLY;
1691 case DLM_MSG_CONVERT:
1692 return DLM_MSG_CONVERT_REPLY;
1693 case DLM_MSG_UNLOCK:
1694 return DLM_MSG_UNLOCK_REPLY;
1695 case DLM_MSG_CANCEL:
1696 return DLM_MSG_CANCEL_REPLY;
1697 case DLM_MSG_LOOKUP:
1698 return DLM_MSG_LOOKUP_REPLY;
1699 }
1700 return -1;
1701 }
1702
1703 /* add/remove lkb from global waiters list of lkb's waiting for
1704 a reply from a remote node */
1705
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1706 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1707 {
1708 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1709
1710 spin_lock_bh(&ls->ls_waiters_lock);
1711 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1712 switch (mstype) {
1713 case DLM_MSG_UNLOCK:
1714 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1715 break;
1716 case DLM_MSG_CANCEL:
1717 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1718 break;
1719 default:
1720 /* should never happen as validate_lock_args() checks
1721 * on lkb_wait_type and validate_unlock_args() only
1722 * creates UNLOCK or CANCEL messages.
1723 */
1724 WARN_ON_ONCE(1);
1725 goto out;
1726 }
1727 lkb->lkb_wait_count++;
1728 hold_lkb(lkb);
1729
1730 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1731 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1732 lkb->lkb_wait_count, dlm_iflags_val(lkb));
1733 goto out;
1734 }
1735
1736 DLM_ASSERT(!lkb->lkb_wait_count,
1737 dlm_print_lkb(lkb);
1738 printk("wait_count %d\n", lkb->lkb_wait_count););
1739
1740 lkb->lkb_wait_count++;
1741 lkb->lkb_wait_type = mstype;
1742 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1743 hold_lkb(lkb);
1744 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1745 out:
1746 spin_unlock_bh(&ls->ls_waiters_lock);
1747 }
1748
1749 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1750 list as part of process_requestqueue (e.g. a lookup that has an optimized
1751 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1752 set RESEND and dlm_recover_waiters_post() */
1753
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1754 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1755 const struct dlm_message *ms)
1756 {
1757 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1758 int overlap_done = 0;
1759
1760 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1761 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1762 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1763 overlap_done = 1;
1764 goto out_del;
1765 }
1766
1767 if (mstype == DLM_MSG_CANCEL_REPLY &&
1768 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1769 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1770 overlap_done = 1;
1771 goto out_del;
1772 }
1773
1774 /* Cancel state was preemptively cleared by a successful convert,
1775 see next comment, nothing to do. */
1776
1777 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1778 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1779 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1780 lkb->lkb_id, lkb->lkb_wait_type);
1781 return -1;
1782 }
1783
1784 /* Remove for the convert reply, and premptively remove for the
1785 cancel reply. A convert has been granted while there's still
1786 an outstanding cancel on it (the cancel is moot and the result
1787 in the cancel reply should be 0). We preempt the cancel reply
1788 because the app gets the convert result and then can follow up
1789 with another op, like convert. This subsequent op would see the
1790 lingering state of the cancel and fail with -EBUSY. */
1791
1792 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1793 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1794 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1795 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1796 lkb->lkb_id);
1797 lkb->lkb_wait_type = 0;
1798 lkb->lkb_wait_count--;
1799 unhold_lkb(lkb);
1800 goto out_del;
1801 }
1802
1803 /* N.B. type of reply may not always correspond to type of original
1804 msg due to lookup->request optimization, verify others? */
1805
1806 if (lkb->lkb_wait_type) {
1807 lkb->lkb_wait_type = 0;
1808 goto out_del;
1809 }
1810
1811 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1812 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1813 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1814 return -1;
1815
1816 out_del:
1817 /* the force-unlock/cancel has completed and we haven't recvd a reply
1818 to the op that was in progress prior to the unlock/cancel; we
1819 give up on any reply to the earlier op. FIXME: not sure when/how
1820 this would happen */
1821
1822 if (overlap_done && lkb->lkb_wait_type) {
1823 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1824 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1825 lkb->lkb_wait_count--;
1826 unhold_lkb(lkb);
1827 lkb->lkb_wait_type = 0;
1828 }
1829
1830 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1831
1832 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1833 lkb->lkb_wait_count--;
1834 if (!lkb->lkb_wait_count)
1835 list_del_init(&lkb->lkb_wait_reply);
1836 unhold_lkb(lkb);
1837 return 0;
1838 }
1839
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1840 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1841 {
1842 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1843 int error;
1844
1845 spin_lock_bh(&ls->ls_waiters_lock);
1846 error = _remove_from_waiters(lkb, mstype, NULL);
1847 spin_unlock_bh(&ls->ls_waiters_lock);
1848 return error;
1849 }
1850
1851 /* Handles situations where we might be processing a "fake" or "local" reply in
1852 * the recovery context which stops any locking activity. Only debugfs might
1853 * change the lockspace waiters but they will held the recovery lock to ensure
1854 * remove_from_waiters_ms() in local case will be the only user manipulating the
1855 * lockspace waiters in recovery context.
1856 */
1857
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1858 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1859 const struct dlm_message *ms, bool local)
1860 {
1861 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1862 int error;
1863
1864 if (!local)
1865 spin_lock_bh(&ls->ls_waiters_lock);
1866 else
1867 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1868 !dlm_locking_stopped(ls));
1869 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1870 if (!local)
1871 spin_unlock_bh(&ls->ls_waiters_lock);
1872 return error;
1873 }
1874
1875 /* lkb is master or local copy */
1876
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1877 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1878 {
1879 int b, len = r->res_ls->ls_lvblen;
1880
1881 /* b=1 lvb returned to caller
1882 b=0 lvb written to rsb or invalidated
1883 b=-1 do nothing */
1884
1885 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1886
1887 if (b == 1) {
1888 if (!lkb->lkb_lvbptr)
1889 return;
1890
1891 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1892 return;
1893
1894 if (!r->res_lvbptr)
1895 return;
1896
1897 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1898 lkb->lkb_lvbseq = r->res_lvbseq;
1899
1900 } else if (b == 0) {
1901 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1902 rsb_set_flag(r, RSB_VALNOTVALID);
1903 return;
1904 }
1905
1906 if (!lkb->lkb_lvbptr)
1907 return;
1908
1909 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1910 return;
1911
1912 if (!r->res_lvbptr)
1913 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1914
1915 if (!r->res_lvbptr)
1916 return;
1917
1918 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1919 r->res_lvbseq++;
1920 lkb->lkb_lvbseq = r->res_lvbseq;
1921 rsb_clear_flag(r, RSB_VALNOTVALID);
1922 }
1923
1924 if (rsb_flag(r, RSB_VALNOTVALID))
1925 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1926 }
1927
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1928 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1929 {
1930 if (lkb->lkb_grmode < DLM_LOCK_PW)
1931 return;
1932
1933 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1934 rsb_set_flag(r, RSB_VALNOTVALID);
1935 return;
1936 }
1937
1938 if (!lkb->lkb_lvbptr)
1939 return;
1940
1941 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1942 return;
1943
1944 if (!r->res_lvbptr)
1945 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1946
1947 if (!r->res_lvbptr)
1948 return;
1949
1950 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1951 r->res_lvbseq++;
1952 rsb_clear_flag(r, RSB_VALNOTVALID);
1953 }
1954
1955 /* lkb is process copy (pc) */
1956
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1957 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1958 const struct dlm_message *ms)
1959 {
1960 int b;
1961
1962 if (!lkb->lkb_lvbptr)
1963 return;
1964
1965 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1966 return;
1967
1968 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1969 if (b == 1) {
1970 int len = receive_extralen(ms);
1971 if (len > r->res_ls->ls_lvblen)
1972 len = r->res_ls->ls_lvblen;
1973 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1974 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1975 }
1976 }
1977
1978 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1979 remove_lock -- used for unlock, removes lkb from granted
1980 revert_lock -- used for cancel, moves lkb from convert to granted
1981 grant_lock -- used for request and convert, adds lkb to granted or
1982 moves lkb from convert or waiting to granted
1983
1984 Each of these is used for master or local copy lkb's. There is
1985 also a _pc() variation used to make the corresponding change on
1986 a process copy (pc) lkb. */
1987
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1988 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1989 {
1990 del_lkb(r, lkb);
1991 lkb->lkb_grmode = DLM_LOCK_IV;
1992 /* this unhold undoes the original ref from create_lkb()
1993 so this leads to the lkb being freed */
1994 unhold_lkb(lkb);
1995 }
1996
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1997 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998 {
1999 set_lvb_unlock(r, lkb);
2000 _remove_lock(r, lkb);
2001 }
2002
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2003 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2004 {
2005 _remove_lock(r, lkb);
2006 }
2007
2008 /* returns: 0 did nothing
2009 1 moved lock to granted
2010 -1 removed lock */
2011
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2012 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2013 {
2014 int rv = 0;
2015
2016 lkb->lkb_rqmode = DLM_LOCK_IV;
2017
2018 switch (lkb->lkb_status) {
2019 case DLM_LKSTS_GRANTED:
2020 break;
2021 case DLM_LKSTS_CONVERT:
2022 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2023 rv = 1;
2024 break;
2025 case DLM_LKSTS_WAITING:
2026 del_lkb(r, lkb);
2027 lkb->lkb_grmode = DLM_LOCK_IV;
2028 /* this unhold undoes the original ref from create_lkb()
2029 so this leads to the lkb being freed */
2030 unhold_lkb(lkb);
2031 rv = -1;
2032 break;
2033 default:
2034 log_print("invalid status for revert %d", lkb->lkb_status);
2035 }
2036 return rv;
2037 }
2038
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2039 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2040 {
2041 return revert_lock(r, lkb);
2042 }
2043
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2044 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2045 {
2046 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2047 lkb->lkb_grmode = lkb->lkb_rqmode;
2048 if (lkb->lkb_status)
2049 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2050 else
2051 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2052 }
2053
2054 lkb->lkb_rqmode = DLM_LOCK_IV;
2055 lkb->lkb_highbast = 0;
2056 }
2057
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2058 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2059 {
2060 set_lvb_lock(r, lkb);
2061 _grant_lock(r, lkb);
2062 }
2063
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2064 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2065 const struct dlm_message *ms)
2066 {
2067 set_lvb_lock_pc(r, lkb, ms);
2068 _grant_lock(r, lkb);
2069 }
2070
2071 /* called by grant_pending_locks() which means an async grant message must
2072 be sent to the requesting node in addition to granting the lock if the
2073 lkb belongs to a remote node. */
2074
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2075 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077 grant_lock(r, lkb);
2078 if (is_master_copy(lkb))
2079 send_grant(r, lkb);
2080 else
2081 queue_cast(r, lkb, 0);
2082 }
2083
2084 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2085 change the granted/requested modes. We're munging things accordingly in
2086 the process copy.
2087 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2088 conversion deadlock
2089 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2090 compatible with other granted locks */
2091
munge_demoted(struct dlm_lkb * lkb)2092 static void munge_demoted(struct dlm_lkb *lkb)
2093 {
2094 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2095 log_print("munge_demoted %x invalid modes gr %d rq %d",
2096 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2097 return;
2098 }
2099
2100 lkb->lkb_grmode = DLM_LOCK_NL;
2101 }
2102
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2103 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2104 {
2105 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2106 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2107 log_print("munge_altmode %x invalid reply type %d",
2108 lkb->lkb_id, le32_to_cpu(ms->m_type));
2109 return;
2110 }
2111
2112 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2113 lkb->lkb_rqmode = DLM_LOCK_PR;
2114 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2115 lkb->lkb_rqmode = DLM_LOCK_CW;
2116 else {
2117 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2118 dlm_print_lkb(lkb);
2119 }
2120 }
2121
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2122 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2123 {
2124 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2125 lkb_statequeue);
2126 if (lkb->lkb_id == first->lkb_id)
2127 return 1;
2128
2129 return 0;
2130 }
2131
2132 /* Check if the given lkb conflicts with another lkb on the queue. */
2133
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2134 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2135 {
2136 struct dlm_lkb *this;
2137
2138 list_for_each_entry(this, head, lkb_statequeue) {
2139 if (this == lkb)
2140 continue;
2141 if (!modes_compat(this, lkb))
2142 return 1;
2143 }
2144 return 0;
2145 }
2146
2147 /*
2148 * "A conversion deadlock arises with a pair of lock requests in the converting
2149 * queue for one resource. The granted mode of each lock blocks the requested
2150 * mode of the other lock."
2151 *
2152 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2153 * convert queue from being granted, then deadlk/demote lkb.
2154 *
2155 * Example:
2156 * Granted Queue: empty
2157 * Convert Queue: NL->EX (first lock)
2158 * PR->EX (second lock)
2159 *
2160 * The first lock can't be granted because of the granted mode of the second
2161 * lock and the second lock can't be granted because it's not first in the
2162 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2163 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2164 * flag set and return DEMOTED in the lksb flags.
2165 *
2166 * Originally, this function detected conv-deadlk in a more limited scope:
2167 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2168 * - if lkb1 was the first entry in the queue (not just earlier), and was
2169 * blocked by the granted mode of lkb2, and there was nothing on the
2170 * granted queue preventing lkb1 from being granted immediately, i.e.
2171 * lkb2 was the only thing preventing lkb1 from being granted.
2172 *
2173 * That second condition meant we'd only say there was conv-deadlk if
2174 * resolving it (by demotion) would lead to the first lock on the convert
2175 * queue being granted right away. It allowed conversion deadlocks to exist
2176 * between locks on the convert queue while they couldn't be granted anyway.
2177 *
2178 * Now, we detect and take action on conversion deadlocks immediately when
2179 * they're created, even if they may not be immediately consequential. If
2180 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2181 * mode that would prevent lkb1's conversion from being granted, we do a
2182 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2183 * I think this means that the lkb_is_ahead condition below should always
2184 * be zero, i.e. there will never be conv-deadlk between two locks that are
2185 * both already on the convert queue.
2186 */
2187
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2188 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2189 {
2190 struct dlm_lkb *lkb1;
2191 int lkb_is_ahead = 0;
2192
2193 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2194 if (lkb1 == lkb2) {
2195 lkb_is_ahead = 1;
2196 continue;
2197 }
2198
2199 if (!lkb_is_ahead) {
2200 if (!modes_compat(lkb2, lkb1))
2201 return 1;
2202 } else {
2203 if (!modes_compat(lkb2, lkb1) &&
2204 !modes_compat(lkb1, lkb2))
2205 return 1;
2206 }
2207 }
2208 return 0;
2209 }
2210
2211 /*
2212 * Return 1 if the lock can be granted, 0 otherwise.
2213 * Also detect and resolve conversion deadlocks.
2214 *
2215 * lkb is the lock to be granted
2216 *
2217 * now is 1 if the function is being called in the context of the
2218 * immediate request, it is 0 if called later, after the lock has been
2219 * queued.
2220 *
2221 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2222 * after recovery.
2223 *
2224 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2225 */
2226
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2227 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2228 int recover)
2229 {
2230 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2231
2232 /*
2233 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2234 * a new request for a NL mode lock being blocked.
2235 *
2236 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2237 * request, then it would be granted. In essence, the use of this flag
2238 * tells the Lock Manager to expedite theis request by not considering
2239 * what may be in the CONVERTING or WAITING queues... As of this
2240 * writing, the EXPEDITE flag can be used only with new requests for NL
2241 * mode locks. This flag is not valid for conversion requests.
2242 *
2243 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2244 * conversion or used with a non-NL requested mode. We also know an
2245 * EXPEDITE request is always granted immediately, so now must always
2246 * be 1. The full condition to grant an expedite request: (now &&
2247 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2248 * therefore be shortened to just checking the flag.
2249 */
2250
2251 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2252 return 1;
2253
2254 /*
2255 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2256 * added to the remaining conditions.
2257 */
2258
2259 if (queue_conflict(&r->res_grantqueue, lkb))
2260 return 0;
2261
2262 /*
2263 * 6-3: By default, a conversion request is immediately granted if the
2264 * requested mode is compatible with the modes of all other granted
2265 * locks
2266 */
2267
2268 if (queue_conflict(&r->res_convertqueue, lkb))
2269 return 0;
2270
2271 /*
2272 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2273 * locks for a recovered rsb, on which lkb's have been rebuilt.
2274 * The lkb's may have been rebuilt on the queues in a different
2275 * order than they were in on the previous master. So, granting
2276 * queued conversions in order after recovery doesn't make sense
2277 * since the order hasn't been preserved anyway. The new order
2278 * could also have created a new "in place" conversion deadlock.
2279 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2280 * After recovery, there would be no granted locks, and possibly
2281 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2282 * recovery, grant conversions without considering order.
2283 */
2284
2285 if (conv && recover)
2286 return 1;
2287
2288 /*
2289 * 6-5: But the default algorithm for deciding whether to grant or
2290 * queue conversion requests does not by itself guarantee that such
2291 * requests are serviced on a "first come first serve" basis. This, in
2292 * turn, can lead to a phenomenon known as "indefinate postponement".
2293 *
2294 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2295 * the system service employed to request a lock conversion. This flag
2296 * forces certain conversion requests to be queued, even if they are
2297 * compatible with the granted modes of other locks on the same
2298 * resource. Thus, the use of this flag results in conversion requests
2299 * being ordered on a "first come first servce" basis.
2300 *
2301 * DCT: This condition is all about new conversions being able to occur
2302 * "in place" while the lock remains on the granted queue (assuming
2303 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2304 * doesn't _have_ to go onto the convert queue where it's processed in
2305 * order. The "now" variable is necessary to distinguish converts
2306 * being received and processed for the first time now, because once a
2307 * convert is moved to the conversion queue the condition below applies
2308 * requiring fifo granting.
2309 */
2310
2311 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2312 return 1;
2313
2314 /*
2315 * Even if the convert is compat with all granted locks,
2316 * QUECVT forces it behind other locks on the convert queue.
2317 */
2318
2319 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2320 if (list_empty(&r->res_convertqueue))
2321 return 1;
2322 else
2323 return 0;
2324 }
2325
2326 /*
2327 * The NOORDER flag is set to avoid the standard vms rules on grant
2328 * order.
2329 */
2330
2331 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2332 return 1;
2333
2334 /*
2335 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2336 * granted until all other conversion requests ahead of it are granted
2337 * and/or canceled.
2338 */
2339
2340 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2341 return 1;
2342
2343 /*
2344 * 6-4: By default, a new request is immediately granted only if all
2345 * three of the following conditions are satisfied when the request is
2346 * issued:
2347 * - The queue of ungranted conversion requests for the resource is
2348 * empty.
2349 * - The queue of ungranted new requests for the resource is empty.
2350 * - The mode of the new request is compatible with the most
2351 * restrictive mode of all granted locks on the resource.
2352 */
2353
2354 if (now && !conv && list_empty(&r->res_convertqueue) &&
2355 list_empty(&r->res_waitqueue))
2356 return 1;
2357
2358 /*
2359 * 6-4: Once a lock request is in the queue of ungranted new requests,
2360 * it cannot be granted until the queue of ungranted conversion
2361 * requests is empty, all ungranted new requests ahead of it are
2362 * granted and/or canceled, and it is compatible with the granted mode
2363 * of the most restrictive lock granted on the resource.
2364 */
2365
2366 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2367 first_in_list(lkb, &r->res_waitqueue))
2368 return 1;
2369
2370 return 0;
2371 }
2372
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2373 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2374 int recover, int *err)
2375 {
2376 int rv;
2377 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2378 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2379
2380 if (err)
2381 *err = 0;
2382
2383 rv = _can_be_granted(r, lkb, now, recover);
2384 if (rv)
2385 goto out;
2386
2387 /*
2388 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2389 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2390 * cancels one of the locks.
2391 */
2392
2393 if (is_convert && can_be_queued(lkb) &&
2394 conversion_deadlock_detect(r, lkb)) {
2395 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2396 lkb->lkb_grmode = DLM_LOCK_NL;
2397 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2398 } else if (err) {
2399 *err = -EDEADLK;
2400 } else {
2401 log_print("can_be_granted deadlock %x now %d",
2402 lkb->lkb_id, now);
2403 dlm_dump_rsb(r);
2404 }
2405 goto out;
2406 }
2407
2408 /*
2409 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2410 * to grant a request in a mode other than the normal rqmode. It's a
2411 * simple way to provide a big optimization to applications that can
2412 * use them.
2413 */
2414
2415 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2416 alt = DLM_LOCK_PR;
2417 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2418 alt = DLM_LOCK_CW;
2419
2420 if (alt) {
2421 lkb->lkb_rqmode = alt;
2422 rv = _can_be_granted(r, lkb, now, 0);
2423 if (rv)
2424 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2425 else
2426 lkb->lkb_rqmode = rqmode;
2427 }
2428 out:
2429 return rv;
2430 }
2431
2432 /* Returns the highest requested mode of all blocked conversions; sets
2433 cw if there's a blocked conversion to DLM_LOCK_CW. */
2434
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2435 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2436 unsigned int *count)
2437 {
2438 struct dlm_lkb *lkb, *s;
2439 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2440 int hi, demoted, quit, grant_restart, demote_restart;
2441 int deadlk;
2442
2443 quit = 0;
2444 restart:
2445 grant_restart = 0;
2446 demote_restart = 0;
2447 hi = DLM_LOCK_IV;
2448
2449 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2450 demoted = is_demoted(lkb);
2451 deadlk = 0;
2452
2453 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2454 grant_lock_pending(r, lkb);
2455 grant_restart = 1;
2456 if (count)
2457 (*count)++;
2458 continue;
2459 }
2460
2461 if (!demoted && is_demoted(lkb)) {
2462 log_print("WARN: pending demoted %x node %d %s",
2463 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2464 demote_restart = 1;
2465 continue;
2466 }
2467
2468 if (deadlk) {
2469 /*
2470 * If DLM_LKB_NODLKWT flag is set and conversion
2471 * deadlock is detected, we request blocking AST and
2472 * down (or cancel) conversion.
2473 */
2474 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2475 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2476 queue_bast(r, lkb, lkb->lkb_rqmode);
2477 lkb->lkb_highbast = lkb->lkb_rqmode;
2478 }
2479 } else {
2480 log_print("WARN: pending deadlock %x node %d %s",
2481 lkb->lkb_id, lkb->lkb_nodeid,
2482 r->res_name);
2483 dlm_dump_rsb(r);
2484 }
2485 continue;
2486 }
2487
2488 hi = max_t(int, lkb->lkb_rqmode, hi);
2489
2490 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2491 *cw = 1;
2492 }
2493
2494 if (grant_restart)
2495 goto restart;
2496 if (demote_restart && !quit) {
2497 quit = 1;
2498 goto restart;
2499 }
2500
2501 return max_t(int, high, hi);
2502 }
2503
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2504 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2505 unsigned int *count)
2506 {
2507 struct dlm_lkb *lkb, *s;
2508
2509 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2510 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2511 grant_lock_pending(r, lkb);
2512 if (count)
2513 (*count)++;
2514 } else {
2515 high = max_t(int, lkb->lkb_rqmode, high);
2516 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2517 *cw = 1;
2518 }
2519 }
2520
2521 return high;
2522 }
2523
2524 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2525 on either the convert or waiting queue.
2526 high is the largest rqmode of all locks blocked on the convert or
2527 waiting queue. */
2528
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2529 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2530 {
2531 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2532 if (gr->lkb_highbast < DLM_LOCK_EX)
2533 return 1;
2534 return 0;
2535 }
2536
2537 if (gr->lkb_highbast < high &&
2538 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2539 return 1;
2540 return 0;
2541 }
2542
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2543 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2544 {
2545 struct dlm_lkb *lkb, *s;
2546 int high = DLM_LOCK_IV;
2547 int cw = 0;
2548
2549 if (!is_master(r)) {
2550 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2551 dlm_dump_rsb(r);
2552 return;
2553 }
2554
2555 high = grant_pending_convert(r, high, &cw, count);
2556 high = grant_pending_wait(r, high, &cw, count);
2557
2558 if (high == DLM_LOCK_IV)
2559 return;
2560
2561 /*
2562 * If there are locks left on the wait/convert queue then send blocking
2563 * ASTs to granted locks based on the largest requested mode (high)
2564 * found above.
2565 */
2566
2567 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2568 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2569 if (cw && high == DLM_LOCK_PR &&
2570 lkb->lkb_grmode == DLM_LOCK_PR)
2571 queue_bast(r, lkb, DLM_LOCK_CW);
2572 else
2573 queue_bast(r, lkb, high);
2574 lkb->lkb_highbast = high;
2575 }
2576 }
2577 }
2578
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2579 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2580 {
2581 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2582 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2583 if (gr->lkb_highbast < DLM_LOCK_EX)
2584 return 1;
2585 return 0;
2586 }
2587
2588 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2589 return 1;
2590 return 0;
2591 }
2592
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2593 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2594 struct dlm_lkb *lkb)
2595 {
2596 struct dlm_lkb *gr;
2597
2598 list_for_each_entry(gr, head, lkb_statequeue) {
2599 /* skip self when sending basts to convertqueue */
2600 if (gr == lkb)
2601 continue;
2602 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2603 queue_bast(r, gr, lkb->lkb_rqmode);
2604 gr->lkb_highbast = lkb->lkb_rqmode;
2605 }
2606 }
2607 }
2608
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2609 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2610 {
2611 send_bast_queue(r, &r->res_grantqueue, lkb);
2612 }
2613
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2614 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2615 {
2616 send_bast_queue(r, &r->res_grantqueue, lkb);
2617 send_bast_queue(r, &r->res_convertqueue, lkb);
2618 }
2619
2620 /* set_master(r, lkb) -- set the master nodeid of a resource
2621
2622 The purpose of this function is to set the nodeid field in the given
2623 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2624 known, it can just be copied to the lkb and the function will return
2625 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2626 before it can be copied to the lkb.
2627
2628 When the rsb nodeid is being looked up remotely, the initial lkb
2629 causing the lookup is kept on the ls_waiters list waiting for the
2630 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2631 on the rsb's res_lookup list until the master is verified.
2632
2633 Return values:
2634 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2635 1: the rsb master is not available and the lkb has been placed on
2636 a wait queue
2637 */
2638
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2639 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2640 {
2641 int our_nodeid = dlm_our_nodeid();
2642
2643 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2644 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2645 r->res_first_lkid = lkb->lkb_id;
2646 lkb->lkb_nodeid = r->res_nodeid;
2647 return 0;
2648 }
2649
2650 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2651 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2652 return 1;
2653 }
2654
2655 if (r->res_master_nodeid == our_nodeid) {
2656 lkb->lkb_nodeid = 0;
2657 return 0;
2658 }
2659
2660 if (r->res_master_nodeid) {
2661 lkb->lkb_nodeid = r->res_master_nodeid;
2662 return 0;
2663 }
2664
2665 if (dlm_dir_nodeid(r) == our_nodeid) {
2666 /* This is a somewhat unusual case; find_rsb will usually
2667 have set res_master_nodeid when dir nodeid is local, but
2668 there are cases where we become the dir node after we've
2669 past find_rsb and go through _request_lock again.
2670 confirm_master() or process_lookup_list() needs to be
2671 called after this. */
2672 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2673 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2674 r->res_name);
2675 r->res_master_nodeid = our_nodeid;
2676 r->res_nodeid = 0;
2677 lkb->lkb_nodeid = 0;
2678 return 0;
2679 }
2680
2681 r->res_first_lkid = lkb->lkb_id;
2682 send_lookup(r, lkb);
2683 return 1;
2684 }
2685
process_lookup_list(struct dlm_rsb * r)2686 static void process_lookup_list(struct dlm_rsb *r)
2687 {
2688 struct dlm_lkb *lkb, *safe;
2689
2690 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2691 list_del_init(&lkb->lkb_rsb_lookup);
2692 _request_lock(r, lkb);
2693 }
2694 }
2695
2696 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2697
confirm_master(struct dlm_rsb * r,int error)2698 static void confirm_master(struct dlm_rsb *r, int error)
2699 {
2700 struct dlm_lkb *lkb;
2701
2702 if (!r->res_first_lkid)
2703 return;
2704
2705 switch (error) {
2706 case 0:
2707 case -EINPROGRESS:
2708 r->res_first_lkid = 0;
2709 process_lookup_list(r);
2710 break;
2711
2712 case -EAGAIN:
2713 case -EBADR:
2714 case -ENOTBLK:
2715 /* the remote request failed and won't be retried (it was
2716 a NOQUEUE, or has been canceled/unlocked); make a waiting
2717 lkb the first_lkid */
2718
2719 r->res_first_lkid = 0;
2720
2721 if (!list_empty(&r->res_lookup)) {
2722 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2723 lkb_rsb_lookup);
2724 list_del_init(&lkb->lkb_rsb_lookup);
2725 r->res_first_lkid = lkb->lkb_id;
2726 _request_lock(r, lkb);
2727 }
2728 break;
2729
2730 default:
2731 log_error(r->res_ls, "confirm_master unknown error %d", error);
2732 }
2733 }
2734
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2735 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2736 int namelen, void (*ast)(void *astparam),
2737 void *astparam,
2738 void (*bast)(void *astparam, int mode),
2739 struct dlm_args *args)
2740 {
2741 int rv = -EINVAL;
2742
2743 /* check for invalid arg usage */
2744
2745 if (mode < 0 || mode > DLM_LOCK_EX)
2746 goto out;
2747
2748 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2749 goto out;
2750
2751 if (flags & DLM_LKF_CANCEL)
2752 goto out;
2753
2754 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2755 goto out;
2756
2757 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2758 goto out;
2759
2760 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2761 goto out;
2762
2763 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2764 goto out;
2765
2766 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2767 goto out;
2768
2769 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2770 goto out;
2771
2772 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2773 goto out;
2774
2775 if (!ast || !lksb)
2776 goto out;
2777
2778 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2779 goto out;
2780
2781 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2782 goto out;
2783
2784 /* these args will be copied to the lkb in validate_lock_args,
2785 it cannot be done now because when converting locks, fields in
2786 an active lkb cannot be modified before locking the rsb */
2787
2788 args->flags = flags;
2789 args->astfn = ast;
2790 args->astparam = astparam;
2791 args->bastfn = bast;
2792 args->mode = mode;
2793 args->lksb = lksb;
2794 rv = 0;
2795 out:
2796 return rv;
2797 }
2798
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2799 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2800 {
2801 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2802 DLM_LKF_FORCEUNLOCK))
2803 return -EINVAL;
2804
2805 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2806 return -EINVAL;
2807
2808 args->flags = flags;
2809 args->astparam = astarg;
2810 return 0;
2811 }
2812
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2813 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2814 struct dlm_args *args)
2815 {
2816 int rv = -EBUSY;
2817
2818 if (args->flags & DLM_LKF_CONVERT) {
2819 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2820 goto out;
2821
2822 /* lock not allowed if there's any op in progress */
2823 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2824 goto out;
2825
2826 if (is_overlap(lkb))
2827 goto out;
2828
2829 rv = -EINVAL;
2830 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2831 goto out;
2832
2833 if (args->flags & DLM_LKF_QUECVT &&
2834 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2835 goto out;
2836 }
2837
2838 lkb->lkb_exflags = args->flags;
2839 dlm_set_sbflags_val(lkb, 0);
2840 lkb->lkb_astfn = args->astfn;
2841 lkb->lkb_astparam = args->astparam;
2842 lkb->lkb_bastfn = args->bastfn;
2843 lkb->lkb_rqmode = args->mode;
2844 lkb->lkb_lksb = args->lksb;
2845 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2846 lkb->lkb_ownpid = (int) current->pid;
2847 rv = 0;
2848 out:
2849 switch (rv) {
2850 case 0:
2851 break;
2852 case -EINVAL:
2853 /* annoy the user because dlm usage is wrong */
2854 WARN_ON(1);
2855 log_error(ls, "%s %d %x %x %x %d %d", __func__,
2856 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2857 lkb->lkb_status, lkb->lkb_wait_type);
2858 break;
2859 default:
2860 log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2861 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2862 lkb->lkb_status, lkb->lkb_wait_type);
2863 break;
2864 }
2865
2866 return rv;
2867 }
2868
2869 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2870 for success */
2871
2872 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2873 because there may be a lookup in progress and it's valid to do
2874 cancel/unlockf on it */
2875
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2876 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2877 {
2878 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2879 int rv = -EBUSY;
2880
2881 /* normal unlock not allowed if there's any op in progress */
2882 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2883 (lkb->lkb_wait_type || lkb->lkb_wait_count))
2884 goto out;
2885
2886 /* an lkb may be waiting for an rsb lookup to complete where the
2887 lookup was initiated by another lock */
2888
2889 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2890 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2891 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2892 list_del_init(&lkb->lkb_rsb_lookup);
2893 queue_cast(lkb->lkb_resource, lkb,
2894 args->flags & DLM_LKF_CANCEL ?
2895 -DLM_ECANCEL : -DLM_EUNLOCK);
2896 unhold_lkb(lkb); /* undoes create_lkb() */
2897 }
2898 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2899 goto out;
2900 }
2901
2902 rv = -EINVAL;
2903 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2904 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2905 dlm_print_lkb(lkb);
2906 goto out;
2907 }
2908
2909 /* an lkb may still exist even though the lock is EOL'ed due to a
2910 * cancel, unlock or failed noqueue request; an app can't use these
2911 * locks; return same error as if the lkid had not been found at all
2912 */
2913
2914 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2915 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2916 rv = -ENOENT;
2917 goto out;
2918 }
2919
2920 if (is_overlap_unlock(lkb))
2921 goto out;
2922
2923 /* cancel not allowed with another cancel/unlock in progress */
2924
2925 if (args->flags & DLM_LKF_CANCEL) {
2926 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2927 goto out;
2928
2929 if (is_overlap_cancel(lkb))
2930 goto out;
2931
2932 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2933 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2934 rv = -EBUSY;
2935 goto out;
2936 }
2937
2938 /* there's nothing to cancel */
2939 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2940 !lkb->lkb_wait_type) {
2941 rv = -EBUSY;
2942 goto out;
2943 }
2944
2945 switch (lkb->lkb_wait_type) {
2946 case DLM_MSG_LOOKUP:
2947 case DLM_MSG_REQUEST:
2948 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2949 rv = -EBUSY;
2950 goto out;
2951 case DLM_MSG_UNLOCK:
2952 case DLM_MSG_CANCEL:
2953 goto out;
2954 }
2955 /* add_to_waiters() will set OVERLAP_CANCEL */
2956 goto out_ok;
2957 }
2958
2959 /* do we need to allow a force-unlock if there's a normal unlock
2960 already in progress? in what conditions could the normal unlock
2961 fail such that we'd want to send a force-unlock to be sure? */
2962
2963 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2964 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2965 goto out;
2966
2967 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2968 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2969 rv = -EBUSY;
2970 goto out;
2971 }
2972
2973 switch (lkb->lkb_wait_type) {
2974 case DLM_MSG_LOOKUP:
2975 case DLM_MSG_REQUEST:
2976 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2977 rv = -EBUSY;
2978 goto out;
2979 case DLM_MSG_UNLOCK:
2980 goto out;
2981 }
2982 /* add_to_waiters() will set OVERLAP_UNLOCK */
2983 }
2984
2985 out_ok:
2986 /* an overlapping op shouldn't blow away exflags from other op */
2987 lkb->lkb_exflags |= args->flags;
2988 dlm_set_sbflags_val(lkb, 0);
2989 lkb->lkb_astparam = args->astparam;
2990 rv = 0;
2991 out:
2992 switch (rv) {
2993 case 0:
2994 break;
2995 case -EINVAL:
2996 /* annoy the user because dlm usage is wrong */
2997 WARN_ON(1);
2998 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
2999 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3000 args->flags, lkb->lkb_wait_type,
3001 lkb->lkb_resource->res_name);
3002 break;
3003 default:
3004 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3005 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3006 args->flags, lkb->lkb_wait_type,
3007 lkb->lkb_resource->res_name);
3008 break;
3009 }
3010
3011 return rv;
3012 }
3013
3014 /*
3015 * Four stage 4 varieties:
3016 * do_request(), do_convert(), do_unlock(), do_cancel()
3017 * These are called on the master node for the given lock and
3018 * from the central locking logic.
3019 */
3020
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3021 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3022 {
3023 int error = 0;
3024
3025 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3026 grant_lock(r, lkb);
3027 queue_cast(r, lkb, 0);
3028 goto out;
3029 }
3030
3031 if (can_be_queued(lkb)) {
3032 error = -EINPROGRESS;
3033 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3034 goto out;
3035 }
3036
3037 error = -EAGAIN;
3038 queue_cast(r, lkb, -EAGAIN);
3039 out:
3040 return error;
3041 }
3042
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3043 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3044 int error)
3045 {
3046 switch (error) {
3047 case -EAGAIN:
3048 if (force_blocking_asts(lkb))
3049 send_blocking_asts_all(r, lkb);
3050 break;
3051 case -EINPROGRESS:
3052 send_blocking_asts(r, lkb);
3053 break;
3054 }
3055 }
3056
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3057 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3058 {
3059 int error = 0;
3060 int deadlk = 0;
3061
3062 /* changing an existing lock may allow others to be granted */
3063
3064 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3065 grant_lock(r, lkb);
3066 queue_cast(r, lkb, 0);
3067 goto out;
3068 }
3069
3070 /* can_be_granted() detected that this lock would block in a conversion
3071 deadlock, so we leave it on the granted queue and return EDEADLK in
3072 the ast for the convert. */
3073
3074 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3075 /* it's left on the granted queue */
3076 revert_lock(r, lkb);
3077 queue_cast(r, lkb, -EDEADLK);
3078 error = -EDEADLK;
3079 goto out;
3080 }
3081
3082 /* is_demoted() means the can_be_granted() above set the grmode
3083 to NL, and left us on the granted queue. This auto-demotion
3084 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3085 now grantable. We have to try to grant other converting locks
3086 before we try again to grant this one. */
3087
3088 if (is_demoted(lkb)) {
3089 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3090 if (_can_be_granted(r, lkb, 1, 0)) {
3091 grant_lock(r, lkb);
3092 queue_cast(r, lkb, 0);
3093 goto out;
3094 }
3095 /* else fall through and move to convert queue */
3096 }
3097
3098 if (can_be_queued(lkb)) {
3099 error = -EINPROGRESS;
3100 del_lkb(r, lkb);
3101 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3102 goto out;
3103 }
3104
3105 error = -EAGAIN;
3106 queue_cast(r, lkb, -EAGAIN);
3107 out:
3108 return error;
3109 }
3110
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3111 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3112 int error)
3113 {
3114 switch (error) {
3115 case 0:
3116 grant_pending_locks(r, NULL);
3117 /* grant_pending_locks also sends basts */
3118 break;
3119 case -EAGAIN:
3120 if (force_blocking_asts(lkb))
3121 send_blocking_asts_all(r, lkb);
3122 break;
3123 case -EINPROGRESS:
3124 send_blocking_asts(r, lkb);
3125 break;
3126 }
3127 }
3128
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3129 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3130 {
3131 remove_lock(r, lkb);
3132 queue_cast(r, lkb, -DLM_EUNLOCK);
3133 return -DLM_EUNLOCK;
3134 }
3135
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3136 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3137 int error)
3138 {
3139 grant_pending_locks(r, NULL);
3140 }
3141
3142 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3143
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3144 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3145 {
3146 int error;
3147
3148 error = revert_lock(r, lkb);
3149 if (error) {
3150 queue_cast(r, lkb, -DLM_ECANCEL);
3151 return -DLM_ECANCEL;
3152 }
3153 return 0;
3154 }
3155
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3156 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3157 int error)
3158 {
3159 if (error)
3160 grant_pending_locks(r, NULL);
3161 }
3162
3163 /*
3164 * Four stage 3 varieties:
3165 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3166 */
3167
3168 /* add a new lkb to a possibly new rsb, called by requesting process */
3169
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3170 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3171 {
3172 int error;
3173
3174 /* set_master: sets lkb nodeid from r */
3175
3176 error = set_master(r, lkb);
3177 if (error < 0)
3178 goto out;
3179 if (error) {
3180 error = 0;
3181 goto out;
3182 }
3183
3184 if (is_remote(r)) {
3185 /* receive_request() calls do_request() on remote node */
3186 error = send_request(r, lkb);
3187 } else {
3188 error = do_request(r, lkb);
3189 /* for remote locks the request_reply is sent
3190 between do_request and do_request_effects */
3191 do_request_effects(r, lkb, error);
3192 }
3193 out:
3194 return error;
3195 }
3196
3197 /* change some property of an existing lkb, e.g. mode */
3198
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3199 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3200 {
3201 int error;
3202
3203 if (is_remote(r)) {
3204 /* receive_convert() calls do_convert() on remote node */
3205 error = send_convert(r, lkb);
3206 } else {
3207 error = do_convert(r, lkb);
3208 /* for remote locks the convert_reply is sent
3209 between do_convert and do_convert_effects */
3210 do_convert_effects(r, lkb, error);
3211 }
3212
3213 return error;
3214 }
3215
3216 /* remove an existing lkb from the granted queue */
3217
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3218 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3219 {
3220 int error;
3221
3222 if (is_remote(r)) {
3223 /* receive_unlock() calls do_unlock() on remote node */
3224 error = send_unlock(r, lkb);
3225 } else {
3226 error = do_unlock(r, lkb);
3227 /* for remote locks the unlock_reply is sent
3228 between do_unlock and do_unlock_effects */
3229 do_unlock_effects(r, lkb, error);
3230 }
3231
3232 return error;
3233 }
3234
3235 /* remove an existing lkb from the convert or wait queue */
3236
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3237 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3238 {
3239 int error;
3240
3241 if (is_remote(r)) {
3242 /* receive_cancel() calls do_cancel() on remote node */
3243 error = send_cancel(r, lkb);
3244 } else {
3245 error = do_cancel(r, lkb);
3246 /* for remote locks the cancel_reply is sent
3247 between do_cancel and do_cancel_effects */
3248 do_cancel_effects(r, lkb, error);
3249 }
3250
3251 return error;
3252 }
3253
3254 /*
3255 * Four stage 2 varieties:
3256 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3257 */
3258
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3259 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3260 const void *name, int len,
3261 struct dlm_args *args)
3262 {
3263 struct dlm_rsb *r;
3264 int error;
3265
3266 error = validate_lock_args(ls, lkb, args);
3267 if (error)
3268 return error;
3269
3270 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3271 if (error)
3272 return error;
3273
3274 lock_rsb(r);
3275
3276 attach_lkb(r, lkb);
3277 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3278
3279 error = _request_lock(r, lkb);
3280
3281 unlock_rsb(r);
3282 put_rsb(r);
3283 return error;
3284 }
3285
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3286 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3287 struct dlm_args *args)
3288 {
3289 struct dlm_rsb *r;
3290 int error;
3291
3292 r = lkb->lkb_resource;
3293
3294 hold_rsb(r);
3295 lock_rsb(r);
3296
3297 error = validate_lock_args(ls, lkb, args);
3298 if (error)
3299 goto out;
3300
3301 error = _convert_lock(r, lkb);
3302 out:
3303 unlock_rsb(r);
3304 put_rsb(r);
3305 return error;
3306 }
3307
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3308 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3309 struct dlm_args *args)
3310 {
3311 struct dlm_rsb *r;
3312 int error;
3313
3314 r = lkb->lkb_resource;
3315
3316 hold_rsb(r);
3317 lock_rsb(r);
3318
3319 error = validate_unlock_args(lkb, args);
3320 if (error)
3321 goto out;
3322
3323 error = _unlock_lock(r, lkb);
3324 out:
3325 unlock_rsb(r);
3326 put_rsb(r);
3327 return error;
3328 }
3329
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3330 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3331 struct dlm_args *args)
3332 {
3333 struct dlm_rsb *r;
3334 int error;
3335
3336 r = lkb->lkb_resource;
3337
3338 hold_rsb(r);
3339 lock_rsb(r);
3340
3341 error = validate_unlock_args(lkb, args);
3342 if (error)
3343 goto out;
3344
3345 error = _cancel_lock(r, lkb);
3346 out:
3347 unlock_rsb(r);
3348 put_rsb(r);
3349 return error;
3350 }
3351
3352 /*
3353 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3354 */
3355
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3356 int dlm_lock(dlm_lockspace_t *lockspace,
3357 int mode,
3358 struct dlm_lksb *lksb,
3359 uint32_t flags,
3360 const void *name,
3361 unsigned int namelen,
3362 uint32_t parent_lkid,
3363 void (*ast) (void *astarg),
3364 void *astarg,
3365 void (*bast) (void *astarg, int mode))
3366 {
3367 struct dlm_ls *ls;
3368 struct dlm_lkb *lkb;
3369 struct dlm_args args;
3370 int error, convert = flags & DLM_LKF_CONVERT;
3371
3372 ls = dlm_find_lockspace_local(lockspace);
3373 if (!ls)
3374 return -EINVAL;
3375
3376 dlm_lock_recovery(ls);
3377
3378 if (convert)
3379 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3380 else
3381 error = create_lkb(ls, &lkb);
3382
3383 if (error)
3384 goto out;
3385
3386 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3387
3388 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3389 &args);
3390 if (error)
3391 goto out_put;
3392
3393 if (convert)
3394 error = convert_lock(ls, lkb, &args);
3395 else
3396 error = request_lock(ls, lkb, name, namelen, &args);
3397
3398 if (error == -EINPROGRESS)
3399 error = 0;
3400 out_put:
3401 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3402
3403 if (convert || error)
3404 __put_lkb(ls, lkb);
3405 if (error == -EAGAIN || error == -EDEADLK)
3406 error = 0;
3407 out:
3408 dlm_unlock_recovery(ls);
3409 dlm_put_lockspace(ls);
3410 return error;
3411 }
3412
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3413 int dlm_unlock(dlm_lockspace_t *lockspace,
3414 uint32_t lkid,
3415 uint32_t flags,
3416 struct dlm_lksb *lksb,
3417 void *astarg)
3418 {
3419 struct dlm_ls *ls;
3420 struct dlm_lkb *lkb;
3421 struct dlm_args args;
3422 int error;
3423
3424 ls = dlm_find_lockspace_local(lockspace);
3425 if (!ls)
3426 return -EINVAL;
3427
3428 dlm_lock_recovery(ls);
3429
3430 error = find_lkb(ls, lkid, &lkb);
3431 if (error)
3432 goto out;
3433
3434 trace_dlm_unlock_start(ls, lkb, flags);
3435
3436 error = set_unlock_args(flags, astarg, &args);
3437 if (error)
3438 goto out_put;
3439
3440 if (flags & DLM_LKF_CANCEL)
3441 error = cancel_lock(ls, lkb, &args);
3442 else
3443 error = unlock_lock(ls, lkb, &args);
3444
3445 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3446 error = 0;
3447 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3448 error = 0;
3449 out_put:
3450 trace_dlm_unlock_end(ls, lkb, flags, error);
3451
3452 dlm_put_lkb(lkb);
3453 out:
3454 dlm_unlock_recovery(ls);
3455 dlm_put_lockspace(ls);
3456 return error;
3457 }
3458
3459 /*
3460 * send/receive routines for remote operations and replies
3461 *
3462 * send_args
3463 * send_common
3464 * send_request receive_request
3465 * send_convert receive_convert
3466 * send_unlock receive_unlock
3467 * send_cancel receive_cancel
3468 * send_grant receive_grant
3469 * send_bast receive_bast
3470 * send_lookup receive_lookup
3471 * send_remove receive_remove
3472 *
3473 * send_common_reply
3474 * receive_request_reply send_request_reply
3475 * receive_convert_reply send_convert_reply
3476 * receive_unlock_reply send_unlock_reply
3477 * receive_cancel_reply send_cancel_reply
3478 * receive_lookup_reply send_lookup_reply
3479 */
3480
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3481 static int _create_message(struct dlm_ls *ls, int mb_len,
3482 int to_nodeid, int mstype,
3483 struct dlm_message **ms_ret,
3484 struct dlm_mhandle **mh_ret)
3485 {
3486 struct dlm_message *ms;
3487 struct dlm_mhandle *mh;
3488 char *mb;
3489
3490 /* get_buffer gives us a message handle (mh) that we need to
3491 pass into midcomms_commit and a message buffer (mb) that we
3492 write our data into */
3493
3494 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3495 if (!mh)
3496 return -ENOBUFS;
3497
3498 ms = (struct dlm_message *) mb;
3499
3500 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3501 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3502 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3503 ms->m_header.h_length = cpu_to_le16(mb_len);
3504 ms->m_header.h_cmd = DLM_MSG;
3505
3506 ms->m_type = cpu_to_le32(mstype);
3507
3508 *mh_ret = mh;
3509 *ms_ret = ms;
3510 return 0;
3511 }
3512
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3513 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3514 int to_nodeid, int mstype,
3515 struct dlm_message **ms_ret,
3516 struct dlm_mhandle **mh_ret)
3517 {
3518 int mb_len = sizeof(struct dlm_message);
3519
3520 switch (mstype) {
3521 case DLM_MSG_REQUEST:
3522 case DLM_MSG_LOOKUP:
3523 case DLM_MSG_REMOVE:
3524 mb_len += r->res_length;
3525 break;
3526 case DLM_MSG_CONVERT:
3527 case DLM_MSG_UNLOCK:
3528 case DLM_MSG_REQUEST_REPLY:
3529 case DLM_MSG_CONVERT_REPLY:
3530 case DLM_MSG_GRANT:
3531 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3532 mb_len += r->res_ls->ls_lvblen;
3533 break;
3534 }
3535
3536 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3537 ms_ret, mh_ret);
3538 }
3539
3540 /* further lowcomms enhancements or alternate implementations may make
3541 the return value from this function useful at some point */
3542
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3543 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3544 const void *name, int namelen)
3545 {
3546 dlm_midcomms_commit_mhandle(mh, name, namelen);
3547 return 0;
3548 }
3549
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3550 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3551 struct dlm_message *ms)
3552 {
3553 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3554 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3555 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3556 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3557 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3558 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3559 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3560 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3561 ms->m_status = cpu_to_le32(lkb->lkb_status);
3562 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3563 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3564 ms->m_hash = cpu_to_le32(r->res_hash);
3565
3566 /* m_result and m_bastmode are set from function args,
3567 not from lkb fields */
3568
3569 if (lkb->lkb_bastfn)
3570 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3571 if (lkb->lkb_astfn)
3572 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3573
3574 /* compare with switch in create_message; send_remove() doesn't
3575 use send_args() */
3576
3577 switch (ms->m_type) {
3578 case cpu_to_le32(DLM_MSG_REQUEST):
3579 case cpu_to_le32(DLM_MSG_LOOKUP):
3580 memcpy(ms->m_extra, r->res_name, r->res_length);
3581 break;
3582 case cpu_to_le32(DLM_MSG_CONVERT):
3583 case cpu_to_le32(DLM_MSG_UNLOCK):
3584 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3585 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3586 case cpu_to_le32(DLM_MSG_GRANT):
3587 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3588 break;
3589 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3590 break;
3591 }
3592 }
3593
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3594 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3595 {
3596 struct dlm_message *ms;
3597 struct dlm_mhandle *mh;
3598 int to_nodeid, error;
3599
3600 to_nodeid = r->res_nodeid;
3601
3602 add_to_waiters(lkb, mstype, to_nodeid);
3603 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3604 if (error)
3605 goto fail;
3606
3607 send_args(r, lkb, ms);
3608
3609 error = send_message(mh, ms, r->res_name, r->res_length);
3610 if (error)
3611 goto fail;
3612 return 0;
3613
3614 fail:
3615 remove_from_waiters(lkb, msg_reply_type(mstype));
3616 return error;
3617 }
3618
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3619 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3620 {
3621 return send_common(r, lkb, DLM_MSG_REQUEST);
3622 }
3623
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3624 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3625 {
3626 int error;
3627
3628 error = send_common(r, lkb, DLM_MSG_CONVERT);
3629
3630 /* down conversions go without a reply from the master */
3631 if (!error && down_conversion(lkb)) {
3632 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3633 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3634 r->res_ls->ls_local_ms.m_result = 0;
3635 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3636 }
3637
3638 return error;
3639 }
3640
3641 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3642 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3643 that the master is still correct. */
3644
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3645 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3646 {
3647 return send_common(r, lkb, DLM_MSG_UNLOCK);
3648 }
3649
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3650 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3651 {
3652 return send_common(r, lkb, DLM_MSG_CANCEL);
3653 }
3654
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3655 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3656 {
3657 struct dlm_message *ms;
3658 struct dlm_mhandle *mh;
3659 int to_nodeid, error;
3660
3661 to_nodeid = lkb->lkb_nodeid;
3662
3663 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3664 if (error)
3665 goto out;
3666
3667 send_args(r, lkb, ms);
3668
3669 ms->m_result = 0;
3670
3671 error = send_message(mh, ms, r->res_name, r->res_length);
3672 out:
3673 return error;
3674 }
3675
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3676 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3677 {
3678 struct dlm_message *ms;
3679 struct dlm_mhandle *mh;
3680 int to_nodeid, error;
3681
3682 to_nodeid = lkb->lkb_nodeid;
3683
3684 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3685 if (error)
3686 goto out;
3687
3688 send_args(r, lkb, ms);
3689
3690 ms->m_bastmode = cpu_to_le32(mode);
3691
3692 error = send_message(mh, ms, r->res_name, r->res_length);
3693 out:
3694 return error;
3695 }
3696
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3697 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3698 {
3699 struct dlm_message *ms;
3700 struct dlm_mhandle *mh;
3701 int to_nodeid, error;
3702
3703 to_nodeid = dlm_dir_nodeid(r);
3704
3705 add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3706 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3707 if (error)
3708 goto fail;
3709
3710 send_args(r, lkb, ms);
3711
3712 error = send_message(mh, ms, r->res_name, r->res_length);
3713 if (error)
3714 goto fail;
3715 return 0;
3716
3717 fail:
3718 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3719 return error;
3720 }
3721
send_remove(struct dlm_rsb * r)3722 static int send_remove(struct dlm_rsb *r)
3723 {
3724 struct dlm_message *ms;
3725 struct dlm_mhandle *mh;
3726 int to_nodeid, error;
3727
3728 to_nodeid = dlm_dir_nodeid(r);
3729
3730 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3731 if (error)
3732 goto out;
3733
3734 memcpy(ms->m_extra, r->res_name, r->res_length);
3735 ms->m_hash = cpu_to_le32(r->res_hash);
3736
3737 error = send_message(mh, ms, r->res_name, r->res_length);
3738 out:
3739 return error;
3740 }
3741
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3742 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3743 int mstype, int rv)
3744 {
3745 struct dlm_message *ms;
3746 struct dlm_mhandle *mh;
3747 int to_nodeid, error;
3748
3749 to_nodeid = lkb->lkb_nodeid;
3750
3751 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3752 if (error)
3753 goto out;
3754
3755 send_args(r, lkb, ms);
3756
3757 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3758
3759 error = send_message(mh, ms, r->res_name, r->res_length);
3760 out:
3761 return error;
3762 }
3763
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3764 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3765 {
3766 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3767 }
3768
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3769 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3770 {
3771 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3772 }
3773
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3774 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3775 {
3776 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3777 }
3778
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3779 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3780 {
3781 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3782 }
3783
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3784 static int send_lookup_reply(struct dlm_ls *ls,
3785 const struct dlm_message *ms_in, int ret_nodeid,
3786 int rv)
3787 {
3788 struct dlm_rsb *r = &ls->ls_local_rsb;
3789 struct dlm_message *ms;
3790 struct dlm_mhandle *mh;
3791 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3792
3793 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3794 if (error)
3795 goto out;
3796
3797 ms->m_lkid = ms_in->m_lkid;
3798 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3799 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3800
3801 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3802 out:
3803 return error;
3804 }
3805
3806 /* which args we save from a received message depends heavily on the type
3807 of message, unlike the send side where we can safely send everything about
3808 the lkb for any type of message */
3809
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3810 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3811 {
3812 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3813 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3814 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3815 }
3816
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3817 static void receive_flags_reply(struct dlm_lkb *lkb,
3818 const struct dlm_message *ms,
3819 bool local)
3820 {
3821 if (local)
3822 return;
3823
3824 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3825 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3826 }
3827
receive_extralen(const struct dlm_message * ms)3828 static int receive_extralen(const struct dlm_message *ms)
3829 {
3830 return (le16_to_cpu(ms->m_header.h_length) -
3831 sizeof(struct dlm_message));
3832 }
3833
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3834 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3835 const struct dlm_message *ms)
3836 {
3837 int len;
3838
3839 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3840 if (!lkb->lkb_lvbptr)
3841 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3842 if (!lkb->lkb_lvbptr)
3843 return -ENOMEM;
3844 len = receive_extralen(ms);
3845 if (len > ls->ls_lvblen)
3846 len = ls->ls_lvblen;
3847 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3848 }
3849 return 0;
3850 }
3851
fake_bastfn(void * astparam,int mode)3852 static void fake_bastfn(void *astparam, int mode)
3853 {
3854 log_print("fake_bastfn should not be called");
3855 }
3856
fake_astfn(void * astparam)3857 static void fake_astfn(void *astparam)
3858 {
3859 log_print("fake_astfn should not be called");
3860 }
3861
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3862 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3863 const struct dlm_message *ms)
3864 {
3865 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3866 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3867 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3868 lkb->lkb_grmode = DLM_LOCK_IV;
3869 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3870
3871 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3872 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3873
3874 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3875 /* lkb was just created so there won't be an lvb yet */
3876 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3877 if (!lkb->lkb_lvbptr)
3878 return -ENOMEM;
3879 }
3880
3881 return 0;
3882 }
3883
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3884 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3885 const struct dlm_message *ms)
3886 {
3887 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3888 return -EBUSY;
3889
3890 if (receive_lvb(ls, lkb, ms))
3891 return -ENOMEM;
3892
3893 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3894 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3895
3896 return 0;
3897 }
3898
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3899 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3900 const struct dlm_message *ms)
3901 {
3902 if (receive_lvb(ls, lkb, ms))
3903 return -ENOMEM;
3904 return 0;
3905 }
3906
3907 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3908 uses to send a reply and that the remote end uses to process the reply. */
3909
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3910 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3911 {
3912 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3913 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3914 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3915 }
3916
3917 /* This is called after the rsb is locked so that we can safely inspect
3918 fields in the lkb. */
3919
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3920 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3921 {
3922 int from = le32_to_cpu(ms->m_header.h_nodeid);
3923 int error = 0;
3924
3925 /* currently mixing of user/kernel locks are not supported */
3926 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3927 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3928 log_error(lkb->lkb_resource->res_ls,
3929 "got user dlm message for a kernel lock");
3930 error = -EINVAL;
3931 goto out;
3932 }
3933
3934 switch (ms->m_type) {
3935 case cpu_to_le32(DLM_MSG_CONVERT):
3936 case cpu_to_le32(DLM_MSG_UNLOCK):
3937 case cpu_to_le32(DLM_MSG_CANCEL):
3938 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3939 error = -EINVAL;
3940 break;
3941
3942 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3943 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3944 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3945 case cpu_to_le32(DLM_MSG_GRANT):
3946 case cpu_to_le32(DLM_MSG_BAST):
3947 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3948 error = -EINVAL;
3949 break;
3950
3951 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3952 if (!is_process_copy(lkb))
3953 error = -EINVAL;
3954 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3955 error = -EINVAL;
3956 break;
3957
3958 default:
3959 error = -EINVAL;
3960 }
3961
3962 out:
3963 if (error)
3964 log_error(lkb->lkb_resource->res_ls,
3965 "ignore invalid message %d from %d %x %x %x %d",
3966 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3967 lkb->lkb_remid, dlm_iflags_val(lkb),
3968 lkb->lkb_nodeid);
3969 return error;
3970 }
3971
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)3972 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3973 {
3974 struct dlm_lkb *lkb;
3975 struct dlm_rsb *r;
3976 int from_nodeid;
3977 int error, namelen = 0;
3978
3979 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3980
3981 error = create_lkb(ls, &lkb);
3982 if (error)
3983 goto fail;
3984
3985 receive_flags(lkb, ms);
3986 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
3987 error = receive_request_args(ls, lkb, ms);
3988 if (error) {
3989 __put_lkb(ls, lkb);
3990 goto fail;
3991 }
3992
3993 /* The dir node is the authority on whether we are the master
3994 for this rsb or not, so if the master sends us a request, we should
3995 recreate the rsb if we've destroyed it. This race happens when we
3996 send a remove message to the dir node at the same time that the dir
3997 node sends us a request for the rsb. */
3998
3999 namelen = receive_extralen(ms);
4000
4001 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4002 R_RECEIVE_REQUEST, &r);
4003 if (error) {
4004 __put_lkb(ls, lkb);
4005 goto fail;
4006 }
4007
4008 lock_rsb(r);
4009
4010 if (r->res_master_nodeid != dlm_our_nodeid()) {
4011 error = validate_master_nodeid(ls, r, from_nodeid);
4012 if (error) {
4013 unlock_rsb(r);
4014 put_rsb(r);
4015 __put_lkb(ls, lkb);
4016 goto fail;
4017 }
4018 }
4019
4020 attach_lkb(r, lkb);
4021 error = do_request(r, lkb);
4022 send_request_reply(r, lkb, error);
4023 do_request_effects(r, lkb, error);
4024
4025 unlock_rsb(r);
4026 put_rsb(r);
4027
4028 if (error == -EINPROGRESS)
4029 error = 0;
4030 if (error)
4031 dlm_put_lkb(lkb);
4032 return 0;
4033
4034 fail:
4035 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4036 and do this receive_request again from process_lookup_list once
4037 we get the lookup reply. This would avoid a many repeated
4038 ENOTBLK request failures when the lookup reply designating us
4039 as master is delayed. */
4040
4041 if (error != -ENOTBLK) {
4042 log_limit(ls, "receive_request %x from %d %d",
4043 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4044 }
4045
4046 setup_local_lkb(ls, ms);
4047 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4048 return error;
4049 }
4050
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4051 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4052 {
4053 struct dlm_lkb *lkb;
4054 struct dlm_rsb *r;
4055 int error, reply = 1;
4056
4057 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4058 if (error)
4059 goto fail;
4060
4061 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4062 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4063 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4064 (unsigned long long)lkb->lkb_recover_seq,
4065 le32_to_cpu(ms->m_header.h_nodeid),
4066 le32_to_cpu(ms->m_lkid));
4067 error = -ENOENT;
4068 dlm_put_lkb(lkb);
4069 goto fail;
4070 }
4071
4072 r = lkb->lkb_resource;
4073
4074 hold_rsb(r);
4075 lock_rsb(r);
4076
4077 error = validate_message(lkb, ms);
4078 if (error)
4079 goto out;
4080
4081 receive_flags(lkb, ms);
4082
4083 error = receive_convert_args(ls, lkb, ms);
4084 if (error) {
4085 send_convert_reply(r, lkb, error);
4086 goto out;
4087 }
4088
4089 reply = !down_conversion(lkb);
4090
4091 error = do_convert(r, lkb);
4092 if (reply)
4093 send_convert_reply(r, lkb, error);
4094 do_convert_effects(r, lkb, error);
4095 out:
4096 unlock_rsb(r);
4097 put_rsb(r);
4098 dlm_put_lkb(lkb);
4099 return 0;
4100
4101 fail:
4102 setup_local_lkb(ls, ms);
4103 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4104 return error;
4105 }
4106
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4107 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4108 {
4109 struct dlm_lkb *lkb;
4110 struct dlm_rsb *r;
4111 int error;
4112
4113 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4114 if (error)
4115 goto fail;
4116
4117 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4118 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4119 lkb->lkb_id, lkb->lkb_remid,
4120 le32_to_cpu(ms->m_header.h_nodeid),
4121 le32_to_cpu(ms->m_lkid));
4122 error = -ENOENT;
4123 dlm_put_lkb(lkb);
4124 goto fail;
4125 }
4126
4127 r = lkb->lkb_resource;
4128
4129 hold_rsb(r);
4130 lock_rsb(r);
4131
4132 error = validate_message(lkb, ms);
4133 if (error)
4134 goto out;
4135
4136 receive_flags(lkb, ms);
4137
4138 error = receive_unlock_args(ls, lkb, ms);
4139 if (error) {
4140 send_unlock_reply(r, lkb, error);
4141 goto out;
4142 }
4143
4144 error = do_unlock(r, lkb);
4145 send_unlock_reply(r, lkb, error);
4146 do_unlock_effects(r, lkb, error);
4147 out:
4148 unlock_rsb(r);
4149 put_rsb(r);
4150 dlm_put_lkb(lkb);
4151 return 0;
4152
4153 fail:
4154 setup_local_lkb(ls, ms);
4155 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4156 return error;
4157 }
4158
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4159 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4160 {
4161 struct dlm_lkb *lkb;
4162 struct dlm_rsb *r;
4163 int error;
4164
4165 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4166 if (error)
4167 goto fail;
4168
4169 receive_flags(lkb, ms);
4170
4171 r = lkb->lkb_resource;
4172
4173 hold_rsb(r);
4174 lock_rsb(r);
4175
4176 error = validate_message(lkb, ms);
4177 if (error)
4178 goto out;
4179
4180 error = do_cancel(r, lkb);
4181 send_cancel_reply(r, lkb, error);
4182 do_cancel_effects(r, lkb, error);
4183 out:
4184 unlock_rsb(r);
4185 put_rsb(r);
4186 dlm_put_lkb(lkb);
4187 return 0;
4188
4189 fail:
4190 setup_local_lkb(ls, ms);
4191 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4192 return error;
4193 }
4194
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4195 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4196 {
4197 struct dlm_lkb *lkb;
4198 struct dlm_rsb *r;
4199 int error;
4200
4201 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4202 if (error)
4203 return error;
4204
4205 r = lkb->lkb_resource;
4206
4207 hold_rsb(r);
4208 lock_rsb(r);
4209
4210 error = validate_message(lkb, ms);
4211 if (error)
4212 goto out;
4213
4214 receive_flags_reply(lkb, ms, false);
4215 if (is_altmode(lkb))
4216 munge_altmode(lkb, ms);
4217 grant_lock_pc(r, lkb, ms);
4218 queue_cast(r, lkb, 0);
4219 out:
4220 unlock_rsb(r);
4221 put_rsb(r);
4222 dlm_put_lkb(lkb);
4223 return 0;
4224 }
4225
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4226 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4227 {
4228 struct dlm_lkb *lkb;
4229 struct dlm_rsb *r;
4230 int error;
4231
4232 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4233 if (error)
4234 return error;
4235
4236 r = lkb->lkb_resource;
4237
4238 hold_rsb(r);
4239 lock_rsb(r);
4240
4241 error = validate_message(lkb, ms);
4242 if (error)
4243 goto out;
4244
4245 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4246 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4247 out:
4248 unlock_rsb(r);
4249 put_rsb(r);
4250 dlm_put_lkb(lkb);
4251 return 0;
4252 }
4253
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4254 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4255 {
4256 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4257
4258 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4259 our_nodeid = dlm_our_nodeid();
4260
4261 len = receive_extralen(ms);
4262
4263 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4264 &ret_nodeid, NULL);
4265
4266 /* Optimization: we're master so treat lookup as a request */
4267 if (!error && ret_nodeid == our_nodeid) {
4268 receive_request(ls, ms);
4269 return;
4270 }
4271 send_lookup_reply(ls, ms, ret_nodeid, error);
4272 }
4273
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4274 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4275 {
4276 char name[DLM_RESNAME_MAXLEN+1];
4277 struct dlm_rsb *r;
4278 int rv, len, dir_nodeid, from_nodeid;
4279
4280 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4281
4282 len = receive_extralen(ms);
4283
4284 if (len > DLM_RESNAME_MAXLEN) {
4285 log_error(ls, "receive_remove from %d bad len %d",
4286 from_nodeid, len);
4287 return;
4288 }
4289
4290 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4291 if (dir_nodeid != dlm_our_nodeid()) {
4292 log_error(ls, "receive_remove from %d bad nodeid %d",
4293 from_nodeid, dir_nodeid);
4294 return;
4295 }
4296
4297 /*
4298 * Look for inactive rsb, if it's there, free it.
4299 * If the rsb is active, it's being used, and we should ignore this
4300 * message. This is an expected race between the dir node sending a
4301 * request to the master node at the same time as the master node sends
4302 * a remove to the dir node. The resolution to that race is for the
4303 * dir node to ignore the remove message, and the master node to
4304 * recreate the master rsb when it gets a request from the dir node for
4305 * an rsb it doesn't have.
4306 */
4307
4308 memset(name, 0, sizeof(name));
4309 memcpy(name, ms->m_extra, len);
4310
4311 rcu_read_lock();
4312 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4313 if (rv) {
4314 rcu_read_unlock();
4315 /* should not happen */
4316 log_error(ls, "%s from %d not found %s", __func__,
4317 from_nodeid, name);
4318 return;
4319 }
4320
4321 write_lock_bh(&ls->ls_rsbtbl_lock);
4322 if (!rsb_flag(r, RSB_HASHED)) {
4323 rcu_read_unlock();
4324 write_unlock_bh(&ls->ls_rsbtbl_lock);
4325 /* should not happen */
4326 log_error(ls, "%s from %d got removed during removal %s",
4327 __func__, from_nodeid, name);
4328 return;
4329 }
4330 /* at this stage the rsb can only being freed here */
4331 rcu_read_unlock();
4332
4333 if (!rsb_flag(r, RSB_INACTIVE)) {
4334 if (r->res_master_nodeid != from_nodeid) {
4335 /* should not happen */
4336 log_error(ls, "receive_remove on active rsb from %d master %d",
4337 from_nodeid, r->res_master_nodeid);
4338 dlm_print_rsb(r);
4339 write_unlock_bh(&ls->ls_rsbtbl_lock);
4340 return;
4341 }
4342
4343 /* Ignore the remove message, see race comment above. */
4344
4345 log_debug(ls, "receive_remove from %d master %d first %x %s",
4346 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4347 name);
4348 write_unlock_bh(&ls->ls_rsbtbl_lock);
4349 return;
4350 }
4351
4352 if (r->res_master_nodeid != from_nodeid) {
4353 log_error(ls, "receive_remove inactive from %d master %d",
4354 from_nodeid, r->res_master_nodeid);
4355 dlm_print_rsb(r);
4356 write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 return;
4358 }
4359
4360 list_del(&r->res_slow_list);
4361 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4362 dlm_rhash_rsb_params);
4363 rsb_clear_flag(r, RSB_HASHED);
4364 write_unlock_bh(&ls->ls_rsbtbl_lock);
4365
4366 free_inactive_rsb(r);
4367 }
4368
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4369 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4370 {
4371 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4372 }
4373
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4374 static int receive_request_reply(struct dlm_ls *ls,
4375 const struct dlm_message *ms)
4376 {
4377 struct dlm_lkb *lkb;
4378 struct dlm_rsb *r;
4379 int error, mstype, result;
4380 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4381
4382 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4383 if (error)
4384 return error;
4385
4386 r = lkb->lkb_resource;
4387 hold_rsb(r);
4388 lock_rsb(r);
4389
4390 error = validate_message(lkb, ms);
4391 if (error)
4392 goto out;
4393
4394 mstype = lkb->lkb_wait_type;
4395 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4396 if (error) {
4397 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4398 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4399 from_dlm_errno(le32_to_cpu(ms->m_result)));
4400 dlm_dump_rsb(r);
4401 goto out;
4402 }
4403
4404 /* Optimization: the dir node was also the master, so it took our
4405 lookup as a request and sent request reply instead of lookup reply */
4406 if (mstype == DLM_MSG_LOOKUP) {
4407 r->res_master_nodeid = from_nodeid;
4408 r->res_nodeid = from_nodeid;
4409 lkb->lkb_nodeid = from_nodeid;
4410 }
4411
4412 /* this is the value returned from do_request() on the master */
4413 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4414
4415 switch (result) {
4416 case -EAGAIN:
4417 /* request would block (be queued) on remote master */
4418 queue_cast(r, lkb, -EAGAIN);
4419 confirm_master(r, -EAGAIN);
4420 unhold_lkb(lkb); /* undoes create_lkb() */
4421 break;
4422
4423 case -EINPROGRESS:
4424 case 0:
4425 /* request was queued or granted on remote master */
4426 receive_flags_reply(lkb, ms, false);
4427 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4428 if (is_altmode(lkb))
4429 munge_altmode(lkb, ms);
4430 if (result) {
4431 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4432 } else {
4433 grant_lock_pc(r, lkb, ms);
4434 queue_cast(r, lkb, 0);
4435 }
4436 confirm_master(r, result);
4437 break;
4438
4439 case -EBADR:
4440 case -ENOTBLK:
4441 /* find_rsb failed to find rsb or rsb wasn't master */
4442 log_limit(ls, "receive_request_reply %x from %d %d "
4443 "master %d dir %d first %x %s", lkb->lkb_id,
4444 from_nodeid, result, r->res_master_nodeid,
4445 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4446
4447 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4448 r->res_master_nodeid != dlm_our_nodeid()) {
4449 /* cause _request_lock->set_master->send_lookup */
4450 r->res_master_nodeid = 0;
4451 r->res_nodeid = -1;
4452 lkb->lkb_nodeid = -1;
4453 }
4454
4455 if (is_overlap(lkb)) {
4456 /* we'll ignore error in cancel/unlock reply */
4457 queue_cast_overlap(r, lkb);
4458 confirm_master(r, result);
4459 unhold_lkb(lkb); /* undoes create_lkb() */
4460 } else {
4461 _request_lock(r, lkb);
4462
4463 if (r->res_master_nodeid == dlm_our_nodeid())
4464 confirm_master(r, 0);
4465 }
4466 break;
4467
4468 default:
4469 log_error(ls, "receive_request_reply %x error %d",
4470 lkb->lkb_id, result);
4471 }
4472
4473 if ((result == 0 || result == -EINPROGRESS) &&
4474 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4475 log_debug(ls, "receive_request_reply %x result %d unlock",
4476 lkb->lkb_id, result);
4477 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4478 send_unlock(r, lkb);
4479 } else if ((result == -EINPROGRESS) &&
4480 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4481 &lkb->lkb_iflags)) {
4482 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4483 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4484 send_cancel(r, lkb);
4485 } else {
4486 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4487 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4488 }
4489 out:
4490 unlock_rsb(r);
4491 put_rsb(r);
4492 dlm_put_lkb(lkb);
4493 return 0;
4494 }
4495
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4496 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4497 const struct dlm_message *ms, bool local)
4498 {
4499 /* this is the value returned from do_convert() on the master */
4500 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4501 case -EAGAIN:
4502 /* convert would block (be queued) on remote master */
4503 queue_cast(r, lkb, -EAGAIN);
4504 break;
4505
4506 case -EDEADLK:
4507 receive_flags_reply(lkb, ms, local);
4508 revert_lock_pc(r, lkb);
4509 queue_cast(r, lkb, -EDEADLK);
4510 break;
4511
4512 case -EINPROGRESS:
4513 /* convert was queued on remote master */
4514 receive_flags_reply(lkb, ms, local);
4515 if (is_demoted(lkb))
4516 munge_demoted(lkb);
4517 del_lkb(r, lkb);
4518 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4519 break;
4520
4521 case 0:
4522 /* convert was granted on remote master */
4523 receive_flags_reply(lkb, ms, local);
4524 if (is_demoted(lkb))
4525 munge_demoted(lkb);
4526 grant_lock_pc(r, lkb, ms);
4527 queue_cast(r, lkb, 0);
4528 break;
4529
4530 default:
4531 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4532 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4533 le32_to_cpu(ms->m_lkid),
4534 from_dlm_errno(le32_to_cpu(ms->m_result)));
4535 dlm_print_rsb(r);
4536 dlm_print_lkb(lkb);
4537 }
4538 }
4539
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4540 static void _receive_convert_reply(struct dlm_lkb *lkb,
4541 const struct dlm_message *ms, bool local)
4542 {
4543 struct dlm_rsb *r = lkb->lkb_resource;
4544 int error;
4545
4546 hold_rsb(r);
4547 lock_rsb(r);
4548
4549 error = validate_message(lkb, ms);
4550 if (error)
4551 goto out;
4552
4553 error = remove_from_waiters_ms(lkb, ms, local);
4554 if (error)
4555 goto out;
4556
4557 __receive_convert_reply(r, lkb, ms, local);
4558 out:
4559 unlock_rsb(r);
4560 put_rsb(r);
4561 }
4562
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4563 static int receive_convert_reply(struct dlm_ls *ls,
4564 const struct dlm_message *ms)
4565 {
4566 struct dlm_lkb *lkb;
4567 int error;
4568
4569 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4570 if (error)
4571 return error;
4572
4573 _receive_convert_reply(lkb, ms, false);
4574 dlm_put_lkb(lkb);
4575 return 0;
4576 }
4577
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4578 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4579 const struct dlm_message *ms, bool local)
4580 {
4581 struct dlm_rsb *r = lkb->lkb_resource;
4582 int error;
4583
4584 hold_rsb(r);
4585 lock_rsb(r);
4586
4587 error = validate_message(lkb, ms);
4588 if (error)
4589 goto out;
4590
4591 error = remove_from_waiters_ms(lkb, ms, local);
4592 if (error)
4593 goto out;
4594
4595 /* this is the value returned from do_unlock() on the master */
4596
4597 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4598 case -DLM_EUNLOCK:
4599 receive_flags_reply(lkb, ms, local);
4600 remove_lock_pc(r, lkb);
4601 queue_cast(r, lkb, -DLM_EUNLOCK);
4602 break;
4603 case -ENOENT:
4604 break;
4605 default:
4606 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4607 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4608 }
4609 out:
4610 unlock_rsb(r);
4611 put_rsb(r);
4612 }
4613
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4614 static int receive_unlock_reply(struct dlm_ls *ls,
4615 const struct dlm_message *ms)
4616 {
4617 struct dlm_lkb *lkb;
4618 int error;
4619
4620 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4621 if (error)
4622 return error;
4623
4624 _receive_unlock_reply(lkb, ms, false);
4625 dlm_put_lkb(lkb);
4626 return 0;
4627 }
4628
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4629 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4630 const struct dlm_message *ms, bool local)
4631 {
4632 struct dlm_rsb *r = lkb->lkb_resource;
4633 int error;
4634
4635 hold_rsb(r);
4636 lock_rsb(r);
4637
4638 error = validate_message(lkb, ms);
4639 if (error)
4640 goto out;
4641
4642 error = remove_from_waiters_ms(lkb, ms, local);
4643 if (error)
4644 goto out;
4645
4646 /* this is the value returned from do_cancel() on the master */
4647
4648 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4649 case -DLM_ECANCEL:
4650 receive_flags_reply(lkb, ms, local);
4651 revert_lock_pc(r, lkb);
4652 queue_cast(r, lkb, -DLM_ECANCEL);
4653 break;
4654 case 0:
4655 break;
4656 default:
4657 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4658 lkb->lkb_id,
4659 from_dlm_errno(le32_to_cpu(ms->m_result)));
4660 }
4661 out:
4662 unlock_rsb(r);
4663 put_rsb(r);
4664 }
4665
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4666 static int receive_cancel_reply(struct dlm_ls *ls,
4667 const struct dlm_message *ms)
4668 {
4669 struct dlm_lkb *lkb;
4670 int error;
4671
4672 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4673 if (error)
4674 return error;
4675
4676 _receive_cancel_reply(lkb, ms, false);
4677 dlm_put_lkb(lkb);
4678 return 0;
4679 }
4680
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4681 static void receive_lookup_reply(struct dlm_ls *ls,
4682 const struct dlm_message *ms)
4683 {
4684 struct dlm_lkb *lkb;
4685 struct dlm_rsb *r;
4686 int error, ret_nodeid;
4687 int do_lookup_list = 0;
4688
4689 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4690 if (error) {
4691 log_error(ls, "%s no lkid %x", __func__,
4692 le32_to_cpu(ms->m_lkid));
4693 return;
4694 }
4695
4696 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4697 FIXME: will a non-zero error ever be returned? */
4698
4699 r = lkb->lkb_resource;
4700 hold_rsb(r);
4701 lock_rsb(r);
4702
4703 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4704 if (error)
4705 goto out;
4706
4707 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4708
4709 /* We sometimes receive a request from the dir node for this
4710 rsb before we've received the dir node's loookup_reply for it.
4711 The request from the dir node implies we're the master, so we set
4712 ourself as master in receive_request_reply, and verify here that
4713 we are indeed the master. */
4714
4715 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4716 /* This should never happen */
4717 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4718 "master %d dir %d our %d first %x %s",
4719 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4720 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4721 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4722 }
4723
4724 if (ret_nodeid == dlm_our_nodeid()) {
4725 r->res_master_nodeid = ret_nodeid;
4726 r->res_nodeid = 0;
4727 do_lookup_list = 1;
4728 r->res_first_lkid = 0;
4729 } else if (ret_nodeid == -1) {
4730 /* the remote node doesn't believe it's the dir node */
4731 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4732 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4733 r->res_master_nodeid = 0;
4734 r->res_nodeid = -1;
4735 lkb->lkb_nodeid = -1;
4736 } else {
4737 /* set_master() will set lkb_nodeid from r */
4738 r->res_master_nodeid = ret_nodeid;
4739 r->res_nodeid = ret_nodeid;
4740 }
4741
4742 if (is_overlap(lkb)) {
4743 log_debug(ls, "receive_lookup_reply %x unlock %x",
4744 lkb->lkb_id, dlm_iflags_val(lkb));
4745 queue_cast_overlap(r, lkb);
4746 unhold_lkb(lkb); /* undoes create_lkb() */
4747 goto out_list;
4748 }
4749
4750 _request_lock(r, lkb);
4751
4752 out_list:
4753 if (do_lookup_list)
4754 process_lookup_list(r);
4755 out:
4756 unlock_rsb(r);
4757 put_rsb(r);
4758 dlm_put_lkb(lkb);
4759 }
4760
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4761 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4762 uint32_t saved_seq)
4763 {
4764 int error = 0, noent = 0;
4765
4766 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4767 log_limit(ls, "receive %d from non-member %d %x %x %d",
4768 le32_to_cpu(ms->m_type),
4769 le32_to_cpu(ms->m_header.h_nodeid),
4770 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4771 from_dlm_errno(le32_to_cpu(ms->m_result)));
4772 return;
4773 }
4774
4775 switch (ms->m_type) {
4776
4777 /* messages sent to a master node */
4778
4779 case cpu_to_le32(DLM_MSG_REQUEST):
4780 error = receive_request(ls, ms);
4781 break;
4782
4783 case cpu_to_le32(DLM_MSG_CONVERT):
4784 error = receive_convert(ls, ms);
4785 break;
4786
4787 case cpu_to_le32(DLM_MSG_UNLOCK):
4788 error = receive_unlock(ls, ms);
4789 break;
4790
4791 case cpu_to_le32(DLM_MSG_CANCEL):
4792 noent = 1;
4793 error = receive_cancel(ls, ms);
4794 break;
4795
4796 /* messages sent from a master node (replies to above) */
4797
4798 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4799 error = receive_request_reply(ls, ms);
4800 break;
4801
4802 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4803 error = receive_convert_reply(ls, ms);
4804 break;
4805
4806 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4807 error = receive_unlock_reply(ls, ms);
4808 break;
4809
4810 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4811 error = receive_cancel_reply(ls, ms);
4812 break;
4813
4814 /* messages sent from a master node (only two types of async msg) */
4815
4816 case cpu_to_le32(DLM_MSG_GRANT):
4817 noent = 1;
4818 error = receive_grant(ls, ms);
4819 break;
4820
4821 case cpu_to_le32(DLM_MSG_BAST):
4822 noent = 1;
4823 error = receive_bast(ls, ms);
4824 break;
4825
4826 /* messages sent to a dir node */
4827
4828 case cpu_to_le32(DLM_MSG_LOOKUP):
4829 receive_lookup(ls, ms);
4830 break;
4831
4832 case cpu_to_le32(DLM_MSG_REMOVE):
4833 receive_remove(ls, ms);
4834 break;
4835
4836 /* messages sent from a dir node (remove has no reply) */
4837
4838 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4839 receive_lookup_reply(ls, ms);
4840 break;
4841
4842 /* other messages */
4843
4844 case cpu_to_le32(DLM_MSG_PURGE):
4845 receive_purge(ls, ms);
4846 break;
4847
4848 default:
4849 log_error(ls, "unknown message type %d",
4850 le32_to_cpu(ms->m_type));
4851 }
4852
4853 /*
4854 * When checking for ENOENT, we're checking the result of
4855 * find_lkb(m_remid):
4856 *
4857 * The lock id referenced in the message wasn't found. This may
4858 * happen in normal usage for the async messages and cancel, so
4859 * only use log_debug for them.
4860 *
4861 * Some errors are expected and normal.
4862 */
4863
4864 if (error == -ENOENT && noent) {
4865 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4866 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4867 le32_to_cpu(ms->m_header.h_nodeid),
4868 le32_to_cpu(ms->m_lkid), saved_seq);
4869 } else if (error == -ENOENT) {
4870 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4871 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4872 le32_to_cpu(ms->m_header.h_nodeid),
4873 le32_to_cpu(ms->m_lkid), saved_seq);
4874
4875 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4876 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4877 }
4878
4879 if (error == -EINVAL) {
4880 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4881 "saved_seq %u",
4882 le32_to_cpu(ms->m_type),
4883 le32_to_cpu(ms->m_header.h_nodeid),
4884 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4885 saved_seq);
4886 }
4887 }
4888
4889 /* If the lockspace is in recovery mode (locking stopped), then normal
4890 messages are saved on the requestqueue for processing after recovery is
4891 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4892 messages off the requestqueue before we process new ones. This occurs right
4893 after recovery completes when we transition from saving all messages on
4894 requestqueue, to processing all the saved messages, to processing new
4895 messages as they arrive. */
4896
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4897 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4898 int nodeid)
4899 {
4900 try_again:
4901 read_lock_bh(&ls->ls_requestqueue_lock);
4902 if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4903 /* If we were a member of this lockspace, left, and rejoined,
4904 other nodes may still be sending us messages from the
4905 lockspace generation before we left. */
4906 if (WARN_ON_ONCE(!ls->ls_generation)) {
4907 read_unlock_bh(&ls->ls_requestqueue_lock);
4908 log_limit(ls, "receive %d from %d ignore old gen",
4909 le32_to_cpu(ms->m_type), nodeid);
4910 return;
4911 }
4912
4913 read_unlock_bh(&ls->ls_requestqueue_lock);
4914 write_lock_bh(&ls->ls_requestqueue_lock);
4915 /* recheck because we hold writelock now */
4916 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4917 write_unlock_bh(&ls->ls_requestqueue_lock);
4918 goto try_again;
4919 }
4920
4921 dlm_add_requestqueue(ls, nodeid, ms);
4922 write_unlock_bh(&ls->ls_requestqueue_lock);
4923 } else {
4924 _receive_message(ls, ms, 0);
4925 read_unlock_bh(&ls->ls_requestqueue_lock);
4926 }
4927 }
4928
4929 /* This is called by dlm_recoverd to process messages that were saved on
4930 the requestqueue. */
4931
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4932 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4933 uint32_t saved_seq)
4934 {
4935 _receive_message(ls, ms, saved_seq);
4936 }
4937
4938 /* This is called by the midcomms layer when something is received for
4939 the lockspace. It could be either a MSG (normal message sent as part of
4940 standard locking activity) or an RCOM (recovery message sent as part of
4941 lockspace recovery). */
4942
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4943 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4944 {
4945 const struct dlm_header *hd = &p->header;
4946 struct dlm_ls *ls;
4947 int type = 0;
4948
4949 switch (hd->h_cmd) {
4950 case DLM_MSG:
4951 type = le32_to_cpu(p->message.m_type);
4952 break;
4953 case DLM_RCOM:
4954 type = le32_to_cpu(p->rcom.rc_type);
4955 break;
4956 default:
4957 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4958 return;
4959 }
4960
4961 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4962 log_print("invalid h_nodeid %d from %d lockspace %x",
4963 le32_to_cpu(hd->h_nodeid), nodeid,
4964 le32_to_cpu(hd->u.h_lockspace));
4965 return;
4966 }
4967
4968 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4969 if (!ls) {
4970 if (dlm_config.ci_log_debug) {
4971 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4972 "%u from %d cmd %d type %d\n",
4973 le32_to_cpu(hd->u.h_lockspace), nodeid,
4974 hd->h_cmd, type);
4975 }
4976
4977 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4978 dlm_send_ls_not_ready(nodeid, &p->rcom);
4979 return;
4980 }
4981
4982 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4983 be inactive (in this ls) before transitioning to recovery mode */
4984
4985 read_lock_bh(&ls->ls_recv_active);
4986 if (hd->h_cmd == DLM_MSG)
4987 dlm_receive_message(ls, &p->message, nodeid);
4988 else if (hd->h_cmd == DLM_RCOM)
4989 dlm_receive_rcom(ls, &p->rcom, nodeid);
4990 else
4991 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
4992 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
4993 read_unlock_bh(&ls->ls_recv_active);
4994
4995 dlm_put_lockspace(ls);
4996 }
4997
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)4998 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4999 struct dlm_message *ms_local)
5000 {
5001 if (middle_conversion(lkb)) {
5002 log_rinfo(ls, "%s %x middle convert in progress", __func__,
5003 lkb->lkb_id);
5004
5005 /* We sent this lock to the new master. The new master will
5006 * tell us when it's granted. We no longer need a reply, so
5007 * use a fake reply to put the lkb into the right state.
5008 */
5009 hold_lkb(lkb);
5010 memset(ms_local, 0, sizeof(struct dlm_message));
5011 ms_local->m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
5012 ms_local->m_result = cpu_to_le32(to_dlm_errno(-EINPROGRESS));
5013 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5014 _receive_convert_reply(lkb, ms_local, true);
5015 unhold_lkb(lkb);
5016
5017 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
5018 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5019 }
5020
5021 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5022 conversions are async; there's no reply from the remote master */
5023 }
5024
5025 /* A waiting lkb needs recovery if the master node has failed, or
5026 the master node is changing (only when no directory is used) */
5027
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5028 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5029 int dir_nodeid)
5030 {
5031 if (dlm_no_directory(ls))
5032 return 1;
5033
5034 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5035 return 1;
5036
5037 return 0;
5038 }
5039
5040 /* Recovery for locks that are waiting for replies from nodes that are now
5041 gone. We can just complete unlocks and cancels by faking a reply from the
5042 dead node. Requests and up-conversions we flag to be resent after
5043 recovery. Down-conversions can just be completed with a fake reply like
5044 unlocks. Conversions between PR and CW need special attention. */
5045
dlm_recover_waiters_pre(struct dlm_ls * ls)5046 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5047 {
5048 struct dlm_lkb *lkb, *safe;
5049 struct dlm_message *ms_local;
5050 int wait_type, local_unlock_result, local_cancel_result;
5051 int dir_nodeid;
5052
5053 ms_local = kmalloc(sizeof(*ms_local), GFP_KERNEL);
5054 if (!ms_local)
5055 return;
5056
5057 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5058
5059 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5060
5061 /* exclude debug messages about unlocks because there can be so
5062 many and they aren't very interesting */
5063
5064 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5065 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5066 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5067 lkb->lkb_id,
5068 lkb->lkb_remid,
5069 lkb->lkb_wait_type,
5070 lkb->lkb_resource->res_nodeid,
5071 lkb->lkb_nodeid,
5072 lkb->lkb_wait_nodeid,
5073 dir_nodeid);
5074 }
5075
5076 /* all outstanding lookups, regardless of destination will be
5077 resent after recovery is done */
5078
5079 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5080 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5081 continue;
5082 }
5083
5084 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5085 continue;
5086
5087 wait_type = lkb->lkb_wait_type;
5088 local_unlock_result = -DLM_EUNLOCK;
5089 local_cancel_result = -DLM_ECANCEL;
5090
5091 /* Main reply may have been received leaving a zero wait_type,
5092 but a reply for the overlapping op may not have been
5093 received. In that case we need to fake the appropriate
5094 reply for the overlap op. */
5095
5096 if (!wait_type) {
5097 if (is_overlap_cancel(lkb)) {
5098 wait_type = DLM_MSG_CANCEL;
5099 if (lkb->lkb_grmode == DLM_LOCK_IV)
5100 local_cancel_result = 0;
5101 }
5102 if (is_overlap_unlock(lkb)) {
5103 wait_type = DLM_MSG_UNLOCK;
5104 if (lkb->lkb_grmode == DLM_LOCK_IV)
5105 local_unlock_result = -ENOENT;
5106 }
5107
5108 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5109 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5110 local_cancel_result, local_unlock_result);
5111 }
5112
5113 switch (wait_type) {
5114
5115 case DLM_MSG_REQUEST:
5116 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5117 break;
5118
5119 case DLM_MSG_CONVERT:
5120 recover_convert_waiter(ls, lkb, ms_local);
5121 break;
5122
5123 case DLM_MSG_UNLOCK:
5124 hold_lkb(lkb);
5125 memset(ms_local, 0, sizeof(struct dlm_message));
5126 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5127 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5128 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5129 _receive_unlock_reply(lkb, ms_local, true);
5130 dlm_put_lkb(lkb);
5131 break;
5132
5133 case DLM_MSG_CANCEL:
5134 hold_lkb(lkb);
5135 memset(ms_local, 0, sizeof(struct dlm_message));
5136 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5137 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5138 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5139 _receive_cancel_reply(lkb, ms_local, true);
5140 dlm_put_lkb(lkb);
5141 break;
5142
5143 default:
5144 log_error(ls, "invalid lkb wait_type %d %d",
5145 lkb->lkb_wait_type, wait_type);
5146 }
5147 schedule();
5148 }
5149 kfree(ms_local);
5150 }
5151
find_resend_waiter(struct dlm_ls * ls)5152 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5153 {
5154 struct dlm_lkb *lkb = NULL, *iter;
5155
5156 spin_lock_bh(&ls->ls_waiters_lock);
5157 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5158 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5159 hold_lkb(iter);
5160 lkb = iter;
5161 break;
5162 }
5163 }
5164 spin_unlock_bh(&ls->ls_waiters_lock);
5165
5166 return lkb;
5167 }
5168
5169 /*
5170 * Forced state reset for locks that were in the middle of remote operations
5171 * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5172 * for a reply from a remote operation.) The lkbs remaining on the waiters
5173 * list need to be reevaluated; some may need resending to a different node
5174 * than previously, and some may now need local handling rather than remote.
5175 *
5176 * First, the lkb state for the voided remote operation is forcibly reset,
5177 * equivalent to what remove_from_waiters() would normally do:
5178 * . lkb removed from ls_waiters list
5179 * . lkb wait_type cleared
5180 * . lkb waiters_count cleared
5181 * . lkb ref count decremented for each waiters_count (almost always 1,
5182 * but possibly 2 in case of cancel/unlock overlapping, which means
5183 * two remote replies were being expected for the lkb.)
5184 *
5185 * Second, the lkb is reprocessed like an original operation would be,
5186 * by passing it to _request_lock or _convert_lock, which will either
5187 * process the lkb operation locally, or send it to a remote node again
5188 * and put the lkb back onto the waiters list.
5189 *
5190 * When reprocessing the lkb, we may find that it's flagged for an overlapping
5191 * force-unlock or cancel, either from before recovery began, or after recovery
5192 * finished. If this is the case, the unlock/cancel is done directly, and the
5193 * original operation is not initiated again (no _request_lock/_convert_lock.)
5194 */
5195
dlm_recover_waiters_post(struct dlm_ls * ls)5196 int dlm_recover_waiters_post(struct dlm_ls *ls)
5197 {
5198 struct dlm_lkb *lkb;
5199 struct dlm_rsb *r;
5200 int error = 0, mstype, err, oc, ou;
5201
5202 while (1) {
5203 if (dlm_locking_stopped(ls)) {
5204 log_debug(ls, "recover_waiters_post aborted");
5205 error = -EINTR;
5206 break;
5207 }
5208
5209 /*
5210 * Find an lkb from the waiters list that's been affected by
5211 * recovery node changes, and needs to be reprocessed. Does
5212 * hold_lkb(), adding a refcount.
5213 */
5214 lkb = find_resend_waiter(ls);
5215 if (!lkb)
5216 break;
5217
5218 r = lkb->lkb_resource;
5219 hold_rsb(r);
5220 lock_rsb(r);
5221
5222 /*
5223 * If the lkb has been flagged for a force unlock or cancel,
5224 * then the reprocessing below will be replaced by just doing
5225 * the unlock/cancel directly.
5226 */
5227 mstype = lkb->lkb_wait_type;
5228 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5229 &lkb->lkb_iflags);
5230 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5231 &lkb->lkb_iflags);
5232 err = 0;
5233
5234 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5235 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5236 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5237 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5238 dlm_dir_nodeid(r), oc, ou);
5239
5240 /*
5241 * No reply to the pre-recovery operation will now be received,
5242 * so a forced equivalent of remove_from_waiters() is needed to
5243 * reset the waiters state that was in place before recovery.
5244 */
5245
5246 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5247
5248 /* Forcibly clear wait_type */
5249 lkb->lkb_wait_type = 0;
5250
5251 /*
5252 * Forcibly reset wait_count and associated refcount. The
5253 * wait_count will almost always be 1, but in case of an
5254 * overlapping unlock/cancel it could be 2: see where
5255 * add_to_waiters() finds the lkb is already on the waiters
5256 * list and does lkb_wait_count++; hold_lkb().
5257 */
5258 while (lkb->lkb_wait_count) {
5259 lkb->lkb_wait_count--;
5260 unhold_lkb(lkb);
5261 }
5262
5263 /* Forcibly remove from waiters list */
5264 spin_lock_bh(&ls->ls_waiters_lock);
5265 list_del_init(&lkb->lkb_wait_reply);
5266 spin_unlock_bh(&ls->ls_waiters_lock);
5267
5268 /*
5269 * The lkb is now clear of all prior waiters state and can be
5270 * processed locally, or sent to remote node again, or directly
5271 * cancelled/unlocked.
5272 */
5273
5274 if (oc || ou) {
5275 /* do an unlock or cancel instead of resending */
5276 switch (mstype) {
5277 case DLM_MSG_LOOKUP:
5278 case DLM_MSG_REQUEST:
5279 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5280 -DLM_ECANCEL);
5281 unhold_lkb(lkb); /* undoes create_lkb() */
5282 break;
5283 case DLM_MSG_CONVERT:
5284 if (oc) {
5285 queue_cast(r, lkb, -DLM_ECANCEL);
5286 } else {
5287 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5288 _unlock_lock(r, lkb);
5289 }
5290 break;
5291 default:
5292 err = 1;
5293 }
5294 } else {
5295 switch (mstype) {
5296 case DLM_MSG_LOOKUP:
5297 case DLM_MSG_REQUEST:
5298 _request_lock(r, lkb);
5299 if (r->res_nodeid != -1 && is_master(r))
5300 confirm_master(r, 0);
5301 break;
5302 case DLM_MSG_CONVERT:
5303 _convert_lock(r, lkb);
5304 break;
5305 default:
5306 err = 1;
5307 }
5308 }
5309
5310 if (err) {
5311 log_error(ls, "waiter %x msg %d r_nodeid %d "
5312 "dir_nodeid %d overlap %d %d",
5313 lkb->lkb_id, mstype, r->res_nodeid,
5314 dlm_dir_nodeid(r), oc, ou);
5315 }
5316 unlock_rsb(r);
5317 put_rsb(r);
5318 dlm_put_lkb(lkb);
5319 }
5320
5321 return error;
5322 }
5323
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5324 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5325 struct list_head *list)
5326 {
5327 struct dlm_lkb *lkb, *safe;
5328
5329 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5330 if (!is_master_copy(lkb))
5331 continue;
5332
5333 /* don't purge lkbs we've added in recover_master_copy for
5334 the current recovery seq */
5335
5336 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5337 continue;
5338
5339 del_lkb(r, lkb);
5340
5341 /* this put should free the lkb */
5342 if (!dlm_put_lkb(lkb))
5343 log_error(ls, "purged mstcpy lkb not released");
5344 }
5345 }
5346
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5347 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5348 {
5349 struct dlm_ls *ls = r->res_ls;
5350
5351 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5352 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5353 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5354 }
5355
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5356 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5357 struct list_head *list,
5358 int nodeid_gone, unsigned int *count)
5359 {
5360 struct dlm_lkb *lkb, *safe;
5361
5362 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5363 if (!is_master_copy(lkb))
5364 continue;
5365
5366 if ((lkb->lkb_nodeid == nodeid_gone) ||
5367 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5368
5369 /* tell recover_lvb to invalidate the lvb
5370 because a node holding EX/PW failed */
5371 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5372 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5373 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5374 }
5375
5376 del_lkb(r, lkb);
5377
5378 /* this put should free the lkb */
5379 if (!dlm_put_lkb(lkb))
5380 log_error(ls, "purged dead lkb not released");
5381
5382 rsb_set_flag(r, RSB_RECOVER_GRANT);
5383
5384 (*count)++;
5385 }
5386 }
5387 }
5388
5389 /* Get rid of locks held by nodes that are gone. */
5390
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5391 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5392 {
5393 struct dlm_rsb *r;
5394 struct dlm_member *memb;
5395 int nodes_count = 0;
5396 int nodeid_gone = 0;
5397 unsigned int lkb_count = 0;
5398
5399 /* cache one removed nodeid to optimize the common
5400 case of a single node removed */
5401
5402 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5403 nodes_count++;
5404 nodeid_gone = memb->nodeid;
5405 }
5406
5407 if (!nodes_count)
5408 return;
5409
5410 list_for_each_entry(r, root_list, res_root_list) {
5411 lock_rsb(r);
5412 if (r->res_nodeid != -1 && is_master(r)) {
5413 purge_dead_list(ls, r, &r->res_grantqueue,
5414 nodeid_gone, &lkb_count);
5415 purge_dead_list(ls, r, &r->res_convertqueue,
5416 nodeid_gone, &lkb_count);
5417 purge_dead_list(ls, r, &r->res_waitqueue,
5418 nodeid_gone, &lkb_count);
5419 }
5420 unlock_rsb(r);
5421
5422 cond_resched();
5423 }
5424
5425 if (lkb_count)
5426 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5427 lkb_count, nodes_count);
5428 }
5429
find_grant_rsb(struct dlm_ls * ls)5430 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5431 {
5432 struct dlm_rsb *r;
5433
5434 read_lock_bh(&ls->ls_rsbtbl_lock);
5435 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5436 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5437 continue;
5438 if (!is_master(r)) {
5439 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5440 continue;
5441 }
5442 hold_rsb(r);
5443 read_unlock_bh(&ls->ls_rsbtbl_lock);
5444 return r;
5445 }
5446 read_unlock_bh(&ls->ls_rsbtbl_lock);
5447 return NULL;
5448 }
5449
5450 /*
5451 * Attempt to grant locks on resources that we are the master of.
5452 * Locks may have become grantable during recovery because locks
5453 * from departed nodes have been purged (or not rebuilt), allowing
5454 * previously blocked locks to now be granted. The subset of rsb's
5455 * we are interested in are those with lkb's on either the convert or
5456 * waiting queues.
5457 *
5458 * Simplest would be to go through each master rsb and check for non-empty
5459 * convert or waiting queues, and attempt to grant on those rsbs.
5460 * Checking the queues requires lock_rsb, though, for which we'd need
5461 * to release the rsbtbl lock. This would make iterating through all
5462 * rsb's very inefficient. So, we rely on earlier recovery routines
5463 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5464 * locks for.
5465 */
5466
dlm_recover_grant(struct dlm_ls * ls)5467 void dlm_recover_grant(struct dlm_ls *ls)
5468 {
5469 struct dlm_rsb *r;
5470 unsigned int count = 0;
5471 unsigned int rsb_count = 0;
5472 unsigned int lkb_count = 0;
5473
5474 while (1) {
5475 r = find_grant_rsb(ls);
5476 if (!r)
5477 break;
5478
5479 rsb_count++;
5480 count = 0;
5481 lock_rsb(r);
5482 /* the RECOVER_GRANT flag is checked in the grant path */
5483 grant_pending_locks(r, &count);
5484 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5485 lkb_count += count;
5486 confirm_master(r, 0);
5487 unlock_rsb(r);
5488 put_rsb(r);
5489 cond_resched();
5490 }
5491
5492 if (lkb_count)
5493 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5494 lkb_count, rsb_count);
5495 }
5496
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5497 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5498 uint32_t remid)
5499 {
5500 struct dlm_lkb *lkb;
5501
5502 list_for_each_entry(lkb, head, lkb_statequeue) {
5503 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5504 return lkb;
5505 }
5506 return NULL;
5507 }
5508
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5509 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5510 uint32_t remid)
5511 {
5512 struct dlm_lkb *lkb;
5513
5514 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5515 if (lkb)
5516 return lkb;
5517 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5518 if (lkb)
5519 return lkb;
5520 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5521 if (lkb)
5522 return lkb;
5523 return NULL;
5524 }
5525
5526 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5527 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5528 struct dlm_rsb *r, const struct dlm_rcom *rc)
5529 {
5530 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5531
5532 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5533 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5534 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5535 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5536 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5537 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5538 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5539 lkb->lkb_rqmode = rl->rl_rqmode;
5540 lkb->lkb_grmode = rl->rl_grmode;
5541 /* don't set lkb_status because add_lkb wants to itself */
5542
5543 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5544 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5545
5546 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5547 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5548 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5549 if (lvblen > ls->ls_lvblen)
5550 return -EINVAL;
5551 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5552 if (!lkb->lkb_lvbptr)
5553 return -ENOMEM;
5554 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5555 }
5556
5557 /* Conversions between PR and CW (middle modes) need special handling.
5558 The real granted mode of these converting locks cannot be determined
5559 until all locks have been rebuilt on the rsb (recover_conversion) */
5560
5561 if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5562 /* We may need to adjust grmode depending on other granted locks. */
5563 log_limit(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5564 __func__, lkb->lkb_id, lkb->lkb_grmode,
5565 lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5566 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5567 }
5568
5569 return 0;
5570 }
5571
5572 /* This lkb may have been recovered in a previous aborted recovery so we need
5573 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5574 If so we just send back a standard reply. If not, we create a new lkb with
5575 the given values and send back our lkid. We send back our lkid by sending
5576 back the rcom_lock struct we got but with the remid field filled in. */
5577
5578 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5579 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5580 __le32 *rl_remid, __le32 *rl_result)
5581 {
5582 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5583 struct dlm_rsb *r;
5584 struct dlm_lkb *lkb;
5585 uint32_t remid = 0;
5586 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5587 int error;
5588
5589 /* init rl_remid with rcom lock rl_remid */
5590 *rl_remid = rl->rl_remid;
5591
5592 if (rl->rl_parent_lkid) {
5593 error = -EOPNOTSUPP;
5594 goto out;
5595 }
5596
5597 remid = le32_to_cpu(rl->rl_lkid);
5598
5599 /* In general we expect the rsb returned to be R_MASTER, but we don't
5600 have to require it. Recovery of masters on one node can overlap
5601 recovery of locks on another node, so one node can send us MSTCPY
5602 locks before we've made ourselves master of this rsb. We can still
5603 add new MSTCPY locks that we receive here without any harm; when
5604 we make ourselves master, dlm_recover_masters() won't touch the
5605 MSTCPY locks we've received early. */
5606
5607 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5608 from_nodeid, R_RECEIVE_RECOVER, &r);
5609 if (error)
5610 goto out;
5611
5612 lock_rsb(r);
5613
5614 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5615 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5616 from_nodeid, remid);
5617 error = -EBADR;
5618 goto out_unlock;
5619 }
5620
5621 lkb = search_remid(r, from_nodeid, remid);
5622 if (lkb) {
5623 error = -EEXIST;
5624 goto out_remid;
5625 }
5626
5627 error = create_lkb(ls, &lkb);
5628 if (error)
5629 goto out_unlock;
5630
5631 error = receive_rcom_lock_args(ls, lkb, r, rc);
5632 if (error) {
5633 __put_lkb(ls, lkb);
5634 goto out_unlock;
5635 }
5636
5637 attach_lkb(r, lkb);
5638 add_lkb(r, lkb, rl->rl_status);
5639 ls->ls_recover_locks_in++;
5640
5641 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5642 rsb_set_flag(r, RSB_RECOVER_GRANT);
5643
5644 out_remid:
5645 /* this is the new value returned to the lock holder for
5646 saving in its process-copy lkb */
5647 *rl_remid = cpu_to_le32(lkb->lkb_id);
5648
5649 lkb->lkb_recover_seq = ls->ls_recover_seq;
5650
5651 out_unlock:
5652 unlock_rsb(r);
5653 put_rsb(r);
5654 out:
5655 if (error && error != -EEXIST)
5656 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5657 from_nodeid, remid, error);
5658 *rl_result = cpu_to_le32(error);
5659 return error;
5660 }
5661
5662 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5663 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5664 uint64_t seq)
5665 {
5666 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5667 struct dlm_rsb *r;
5668 struct dlm_lkb *lkb;
5669 uint32_t lkid, remid;
5670 int error, result;
5671
5672 lkid = le32_to_cpu(rl->rl_lkid);
5673 remid = le32_to_cpu(rl->rl_remid);
5674 result = le32_to_cpu(rl->rl_result);
5675
5676 error = find_lkb(ls, lkid, &lkb);
5677 if (error) {
5678 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5679 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5680 result);
5681 return error;
5682 }
5683
5684 r = lkb->lkb_resource;
5685 hold_rsb(r);
5686 lock_rsb(r);
5687
5688 if (!is_process_copy(lkb)) {
5689 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5690 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691 result);
5692 dlm_dump_rsb(r);
5693 unlock_rsb(r);
5694 put_rsb(r);
5695 dlm_put_lkb(lkb);
5696 return -EINVAL;
5697 }
5698
5699 switch (result) {
5700 case -EBADR:
5701 /* There's a chance the new master received our lock before
5702 dlm_recover_master_reply(), this wouldn't happen if we did
5703 a barrier between recover_masters and recover_locks. */
5704
5705 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5706 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 result);
5708
5709 dlm_send_rcom_lock(r, lkb, seq);
5710 goto out;
5711 case -EEXIST:
5712 case 0:
5713 lkb->lkb_remid = remid;
5714 break;
5715 default:
5716 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5717 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 result);
5719 }
5720
5721 /* an ack for dlm_recover_locks() which waits for replies from
5722 all the locks it sends to new masters */
5723 dlm_recovered_lock(r);
5724 out:
5725 unlock_rsb(r);
5726 put_rsb(r);
5727 dlm_put_lkb(lkb);
5728
5729 return 0;
5730 }
5731
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5732 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5733 int mode, uint32_t flags, void *name, unsigned int namelen)
5734 {
5735 struct dlm_lkb *lkb;
5736 struct dlm_args args;
5737 bool do_put = true;
5738 int error;
5739
5740 dlm_lock_recovery(ls);
5741
5742 error = create_lkb(ls, &lkb);
5743 if (error) {
5744 kfree(ua);
5745 goto out;
5746 }
5747
5748 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5749
5750 if (flags & DLM_LKF_VALBLK) {
5751 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5752 if (!ua->lksb.sb_lvbptr) {
5753 kfree(ua);
5754 error = -ENOMEM;
5755 goto out_put;
5756 }
5757 }
5758 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5759 fake_bastfn, &args);
5760 if (error) {
5761 kfree(ua->lksb.sb_lvbptr);
5762 ua->lksb.sb_lvbptr = NULL;
5763 kfree(ua);
5764 goto out_put;
5765 }
5766
5767 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5768 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5769 lock and that lkb_astparam is the dlm_user_args structure. */
5770 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5771 error = request_lock(ls, lkb, name, namelen, &args);
5772
5773 switch (error) {
5774 case 0:
5775 break;
5776 case -EINPROGRESS:
5777 error = 0;
5778 break;
5779 case -EAGAIN:
5780 error = 0;
5781 fallthrough;
5782 default:
5783 goto out_put;
5784 }
5785
5786 /* add this new lkb to the per-process list of locks */
5787 spin_lock_bh(&ua->proc->locks_spin);
5788 hold_lkb(lkb);
5789 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5790 spin_unlock_bh(&ua->proc->locks_spin);
5791 do_put = false;
5792 out_put:
5793 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5794 if (do_put)
5795 __put_lkb(ls, lkb);
5796 out:
5797 dlm_unlock_recovery(ls);
5798 return error;
5799 }
5800
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5801 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5802 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5803 {
5804 struct dlm_lkb *lkb;
5805 struct dlm_args args;
5806 struct dlm_user_args *ua;
5807 int error;
5808
5809 dlm_lock_recovery(ls);
5810
5811 error = find_lkb(ls, lkid, &lkb);
5812 if (error)
5813 goto out;
5814
5815 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5816
5817 /* user can change the params on its lock when it converts it, or
5818 add an lvb that didn't exist before */
5819
5820 ua = lkb->lkb_ua;
5821
5822 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5823 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5824 if (!ua->lksb.sb_lvbptr) {
5825 error = -ENOMEM;
5826 goto out_put;
5827 }
5828 }
5829 if (lvb_in && ua->lksb.sb_lvbptr)
5830 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5831
5832 ua->xid = ua_tmp->xid;
5833 ua->castparam = ua_tmp->castparam;
5834 ua->castaddr = ua_tmp->castaddr;
5835 ua->bastparam = ua_tmp->bastparam;
5836 ua->bastaddr = ua_tmp->bastaddr;
5837 ua->user_lksb = ua_tmp->user_lksb;
5838
5839 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5840 fake_bastfn, &args);
5841 if (error)
5842 goto out_put;
5843
5844 error = convert_lock(ls, lkb, &args);
5845
5846 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5847 error = 0;
5848 out_put:
5849 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5850 dlm_put_lkb(lkb);
5851 out:
5852 dlm_unlock_recovery(ls);
5853 kfree(ua_tmp);
5854 return error;
5855 }
5856
5857 /*
5858 * The caller asks for an orphan lock on a given resource with a given mode.
5859 * If a matching lock exists, it's moved to the owner's list of locks and
5860 * the lkid is returned.
5861 */
5862
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5863 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5864 int mode, uint32_t flags, void *name, unsigned int namelen,
5865 uint32_t *lkid)
5866 {
5867 struct dlm_lkb *lkb = NULL, *iter;
5868 struct dlm_user_args *ua;
5869 int found_other_mode = 0;
5870 int rv = 0;
5871
5872 spin_lock_bh(&ls->ls_orphans_lock);
5873 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5874 if (iter->lkb_resource->res_length != namelen)
5875 continue;
5876 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5877 continue;
5878 if (iter->lkb_grmode != mode) {
5879 found_other_mode = 1;
5880 continue;
5881 }
5882
5883 lkb = iter;
5884 list_del_init(&iter->lkb_ownqueue);
5885 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5886 *lkid = iter->lkb_id;
5887 break;
5888 }
5889 spin_unlock_bh(&ls->ls_orphans_lock);
5890
5891 if (!lkb && found_other_mode) {
5892 rv = -EAGAIN;
5893 goto out;
5894 }
5895
5896 if (!lkb) {
5897 rv = -ENOENT;
5898 goto out;
5899 }
5900
5901 lkb->lkb_exflags = flags;
5902 lkb->lkb_ownpid = (int) current->pid;
5903
5904 ua = lkb->lkb_ua;
5905
5906 ua->proc = ua_tmp->proc;
5907 ua->xid = ua_tmp->xid;
5908 ua->castparam = ua_tmp->castparam;
5909 ua->castaddr = ua_tmp->castaddr;
5910 ua->bastparam = ua_tmp->bastparam;
5911 ua->bastaddr = ua_tmp->bastaddr;
5912 ua->user_lksb = ua_tmp->user_lksb;
5913
5914 /*
5915 * The lkb reference from the ls_orphans list was not
5916 * removed above, and is now considered the reference
5917 * for the proc locks list.
5918 */
5919
5920 spin_lock_bh(&ua->proc->locks_spin);
5921 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5922 spin_unlock_bh(&ua->proc->locks_spin);
5923 out:
5924 kfree(ua_tmp);
5925 return rv;
5926 }
5927
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5928 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929 uint32_t flags, uint32_t lkid, char *lvb_in)
5930 {
5931 struct dlm_lkb *lkb;
5932 struct dlm_args args;
5933 struct dlm_user_args *ua;
5934 int error;
5935
5936 dlm_lock_recovery(ls);
5937
5938 error = find_lkb(ls, lkid, &lkb);
5939 if (error)
5940 goto out;
5941
5942 trace_dlm_unlock_start(ls, lkb, flags);
5943
5944 ua = lkb->lkb_ua;
5945
5946 if (lvb_in && ua->lksb.sb_lvbptr)
5947 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5948 if (ua_tmp->castparam)
5949 ua->castparam = ua_tmp->castparam;
5950 ua->user_lksb = ua_tmp->user_lksb;
5951
5952 error = set_unlock_args(flags, ua, &args);
5953 if (error)
5954 goto out_put;
5955
5956 error = unlock_lock(ls, lkb, &args);
5957
5958 if (error == -DLM_EUNLOCK)
5959 error = 0;
5960 /* from validate_unlock_args() */
5961 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5962 error = 0;
5963 if (error)
5964 goto out_put;
5965
5966 spin_lock_bh(&ua->proc->locks_spin);
5967 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5968 if (!list_empty(&lkb->lkb_ownqueue))
5969 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5970 spin_unlock_bh(&ua->proc->locks_spin);
5971 out_put:
5972 trace_dlm_unlock_end(ls, lkb, flags, error);
5973 dlm_put_lkb(lkb);
5974 out:
5975 dlm_unlock_recovery(ls);
5976 kfree(ua_tmp);
5977 return error;
5978 }
5979
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)5980 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5981 uint32_t flags, uint32_t lkid)
5982 {
5983 struct dlm_lkb *lkb;
5984 struct dlm_args args;
5985 struct dlm_user_args *ua;
5986 int error;
5987
5988 dlm_lock_recovery(ls);
5989
5990 error = find_lkb(ls, lkid, &lkb);
5991 if (error)
5992 goto out;
5993
5994 trace_dlm_unlock_start(ls, lkb, flags);
5995
5996 ua = lkb->lkb_ua;
5997 if (ua_tmp->castparam)
5998 ua->castparam = ua_tmp->castparam;
5999 ua->user_lksb = ua_tmp->user_lksb;
6000
6001 error = set_unlock_args(flags, ua, &args);
6002 if (error)
6003 goto out_put;
6004
6005 error = cancel_lock(ls, lkb, &args);
6006
6007 if (error == -DLM_ECANCEL)
6008 error = 0;
6009 /* from validate_unlock_args() */
6010 if (error == -EBUSY)
6011 error = 0;
6012 out_put:
6013 trace_dlm_unlock_end(ls, lkb, flags, error);
6014 dlm_put_lkb(lkb);
6015 out:
6016 dlm_unlock_recovery(ls);
6017 kfree(ua_tmp);
6018 return error;
6019 }
6020
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6021 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6022 {
6023 struct dlm_lkb *lkb;
6024 struct dlm_args args;
6025 struct dlm_user_args *ua;
6026 struct dlm_rsb *r;
6027 int error;
6028
6029 dlm_lock_recovery(ls);
6030
6031 error = find_lkb(ls, lkid, &lkb);
6032 if (error)
6033 goto out;
6034
6035 trace_dlm_unlock_start(ls, lkb, flags);
6036
6037 ua = lkb->lkb_ua;
6038
6039 error = set_unlock_args(flags, ua, &args);
6040 if (error)
6041 goto out_put;
6042
6043 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6044
6045 r = lkb->lkb_resource;
6046 hold_rsb(r);
6047 lock_rsb(r);
6048
6049 error = validate_unlock_args(lkb, &args);
6050 if (error)
6051 goto out_r;
6052 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6053
6054 error = _cancel_lock(r, lkb);
6055 out_r:
6056 unlock_rsb(r);
6057 put_rsb(r);
6058
6059 if (error == -DLM_ECANCEL)
6060 error = 0;
6061 /* from validate_unlock_args() */
6062 if (error == -EBUSY)
6063 error = 0;
6064 out_put:
6065 trace_dlm_unlock_end(ls, lkb, flags, error);
6066 dlm_put_lkb(lkb);
6067 out:
6068 dlm_unlock_recovery(ls);
6069 return error;
6070 }
6071
6072 /* lkb's that are removed from the waiters list by revert are just left on the
6073 orphans list with the granted orphan locks, to be freed by purge */
6074
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6075 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6076 {
6077 struct dlm_args args;
6078 int error;
6079
6080 hold_lkb(lkb); /* reference for the ls_orphans list */
6081 spin_lock_bh(&ls->ls_orphans_lock);
6082 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6083 spin_unlock_bh(&ls->ls_orphans_lock);
6084
6085 set_unlock_args(0, lkb->lkb_ua, &args);
6086
6087 error = cancel_lock(ls, lkb, &args);
6088 if (error == -DLM_ECANCEL)
6089 error = 0;
6090 return error;
6091 }
6092
6093 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6094 granted. Regardless of what rsb queue the lock is on, it's removed and
6095 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6096 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6097
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6098 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6099 {
6100 struct dlm_args args;
6101 int error;
6102
6103 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6104 lkb->lkb_ua, &args);
6105
6106 error = unlock_lock(ls, lkb, &args);
6107 if (error == -DLM_EUNLOCK)
6108 error = 0;
6109 return error;
6110 }
6111
6112 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6113 (which does lock_rsb) due to deadlock with receiving a message that does
6114 lock_rsb followed by dlm_user_add_cb() */
6115
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6116 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6117 struct dlm_user_proc *proc)
6118 {
6119 struct dlm_lkb *lkb = NULL;
6120
6121 spin_lock_bh(&ls->ls_clear_proc_locks);
6122 if (list_empty(&proc->locks))
6123 goto out;
6124
6125 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6126 list_del_init(&lkb->lkb_ownqueue);
6127
6128 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6129 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6130 else
6131 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6132 out:
6133 spin_unlock_bh(&ls->ls_clear_proc_locks);
6134 return lkb;
6135 }
6136
6137 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6138 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6139 which we clear here. */
6140
6141 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6142 list, and no more device_writes should add lkb's to proc->locks list; so we
6143 shouldn't need to take asts_spin or locks_spin here. this assumes that
6144 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6145 them ourself. */
6146
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6147 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6148 {
6149 struct dlm_callback *cb, *cb_safe;
6150 struct dlm_lkb *lkb, *safe;
6151
6152 dlm_lock_recovery(ls);
6153
6154 while (1) {
6155 lkb = del_proc_lock(ls, proc);
6156 if (!lkb)
6157 break;
6158 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6159 orphan_proc_lock(ls, lkb);
6160 else
6161 unlock_proc_lock(ls, lkb);
6162
6163 /* this removes the reference for the proc->locks list
6164 added by dlm_user_request, it may result in the lkb
6165 being freed */
6166
6167 dlm_put_lkb(lkb);
6168 }
6169
6170 spin_lock_bh(&ls->ls_clear_proc_locks);
6171
6172 /* in-progress unlocks */
6173 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6174 list_del_init(&lkb->lkb_ownqueue);
6175 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6176 dlm_put_lkb(lkb);
6177 }
6178
6179 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6180 list_del(&cb->list);
6181 dlm_free_cb(cb);
6182 }
6183
6184 spin_unlock_bh(&ls->ls_clear_proc_locks);
6185 dlm_unlock_recovery(ls);
6186 }
6187
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6188 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6189 {
6190 struct dlm_callback *cb, *cb_safe;
6191 struct dlm_lkb *lkb, *safe;
6192
6193 while (1) {
6194 lkb = NULL;
6195 spin_lock_bh(&proc->locks_spin);
6196 if (!list_empty(&proc->locks)) {
6197 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6198 lkb_ownqueue);
6199 list_del_init(&lkb->lkb_ownqueue);
6200 }
6201 spin_unlock_bh(&proc->locks_spin);
6202
6203 if (!lkb)
6204 break;
6205
6206 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6207 unlock_proc_lock(ls, lkb);
6208 dlm_put_lkb(lkb); /* ref from proc->locks list */
6209 }
6210
6211 spin_lock_bh(&proc->locks_spin);
6212 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213 list_del_init(&lkb->lkb_ownqueue);
6214 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6215 dlm_put_lkb(lkb);
6216 }
6217 spin_unlock_bh(&proc->locks_spin);
6218
6219 spin_lock_bh(&proc->asts_spin);
6220 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6221 list_del(&cb->list);
6222 dlm_free_cb(cb);
6223 }
6224 spin_unlock_bh(&proc->asts_spin);
6225 }
6226
6227 /* pid of 0 means purge all orphans */
6228
do_purge(struct dlm_ls * ls,int nodeid,int pid)6229 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6230 {
6231 struct dlm_lkb *lkb, *safe;
6232
6233 spin_lock_bh(&ls->ls_orphans_lock);
6234 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6235 if (pid && lkb->lkb_ownpid != pid)
6236 continue;
6237 unlock_proc_lock(ls, lkb);
6238 list_del_init(&lkb->lkb_ownqueue);
6239 dlm_put_lkb(lkb);
6240 }
6241 spin_unlock_bh(&ls->ls_orphans_lock);
6242 }
6243
send_purge(struct dlm_ls * ls,int nodeid,int pid)6244 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6245 {
6246 struct dlm_message *ms;
6247 struct dlm_mhandle *mh;
6248 int error;
6249
6250 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6251 DLM_MSG_PURGE, &ms, &mh);
6252 if (error)
6253 return error;
6254 ms->m_nodeid = cpu_to_le32(nodeid);
6255 ms->m_pid = cpu_to_le32(pid);
6256
6257 return send_message(mh, ms, NULL, 0);
6258 }
6259
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6260 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6261 int nodeid, int pid)
6262 {
6263 int error = 0;
6264
6265 if (nodeid && (nodeid != dlm_our_nodeid())) {
6266 error = send_purge(ls, nodeid, pid);
6267 } else {
6268 dlm_lock_recovery(ls);
6269 if (pid == current->pid)
6270 purge_proc_locks(ls, proc);
6271 else
6272 do_purge(ls, nodeid, pid);
6273 dlm_unlock_recovery(ls);
6274 }
6275 return error;
6276 }
6277
6278 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6279 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6280 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6281 {
6282 struct dlm_lksb *lksb;
6283 struct dlm_lkb *lkb;
6284 struct dlm_rsb *r;
6285 int error;
6286
6287 /* we currently can't set a valid user lock */
6288 if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6289 return -EOPNOTSUPP;
6290
6291 lksb = kzalloc(sizeof(*lksb), GFP_NOFS);
6292 if (!lksb)
6293 return -ENOMEM;
6294
6295 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6296 if (error) {
6297 kfree(lksb);
6298 return error;
6299 }
6300
6301 dlm_set_dflags_val(lkb, lkb_dflags);
6302 lkb->lkb_nodeid = lkb_nodeid;
6303 lkb->lkb_lksb = lksb;
6304 /* user specific pointer, just don't have it NULL for kernel locks */
6305 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6306 lkb->lkb_astparam = (void *)0xDEADBEEF;
6307
6308 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6309 if (error) {
6310 kfree(lksb);
6311 __put_lkb(ls, lkb);
6312 return error;
6313 }
6314
6315 lock_rsb(r);
6316 attach_lkb(r, lkb);
6317 add_lkb(r, lkb, lkb_status);
6318 unlock_rsb(r);
6319 put_rsb(r);
6320
6321 return 0;
6322 }
6323
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6324 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6325 int mstype, int to_nodeid)
6326 {
6327 struct dlm_lkb *lkb;
6328 int error;
6329
6330 error = find_lkb(ls, lkb_id, &lkb);
6331 if (error)
6332 return error;
6333
6334 add_to_waiters(lkb, mstype, to_nodeid);
6335 dlm_put_lkb(lkb);
6336 return 0;
6337 }
6338
6339