1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
4 **
5 ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved.
6 **
7 **
8 *******************************************************************************
9 ******************************************************************************/
10
11 /* Central locking logic has four stages:
12
13 dlm_lock()
14 dlm_unlock()
15
16 request_lock(ls, lkb)
17 convert_lock(ls, lkb)
18 unlock_lock(ls, lkb)
19 cancel_lock(ls, lkb)
20
21 _request_lock(r, lkb)
22 _convert_lock(r, lkb)
23 _unlock_lock(r, lkb)
24 _cancel_lock(r, lkb)
25
26 do_request(r, lkb)
27 do_convert(r, lkb)
28 do_unlock(r, lkb)
29 do_cancel(r, lkb)
30
31 Stage 1 (lock, unlock) is mainly about checking input args and
32 splitting into one of the four main operations:
33
34 dlm_lock = request_lock
35 dlm_lock+CONVERT = convert_lock
36 dlm_unlock = unlock_lock
37 dlm_unlock+CANCEL = cancel_lock
38
39 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
40 provided to the next stage.
41
42 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
43 When remote, it calls send_xxxx(), when local it calls do_xxxx().
44
45 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
46 given rsb and lkb and queues callbacks.
47
48 For remote operations, send_xxxx() results in the corresponding do_xxxx()
49 function being executed on the remote node. The connecting send/receive
50 calls on local (L) and remote (R) nodes:
51
52 L: send_xxxx() -> R: receive_xxxx()
53 R: do_xxxx()
54 L: receive_xxxx_reply() <- R: send_xxxx_reply()
55 */
56 #include <trace/events/dlm.h>
57
58 #include <linux/types.h>
59 #include <linux/rbtree.h>
60 #include <linux/slab.h>
61 #include "dlm_internal.h"
62 #include <linux/dlm_device.h>
63 #include "memory.h"
64 #include "midcomms.h"
65 #include "requestqueue.h"
66 #include "util.h"
67 #include "dir.h"
68 #include "member.h"
69 #include "lockspace.h"
70 #include "ast.h"
71 #include "lock.h"
72 #include "rcom.h"
73 #include "recover.h"
74 #include "lvb_table.h"
75 #include "user.h"
76 #include "config.h"
77
78 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
84 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int send_remove(struct dlm_rsb *r);
86 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
88 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
89 const struct dlm_message *ms, bool local);
90 static int receive_extralen(const struct dlm_message *ms);
91 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
92 static void deactivate_rsb(struct kref *kref);
93
94 /*
95 * Lock compatibilty matrix - thanks Steve
96 * UN = Unlocked state. Not really a state, used as a flag
97 * PD = Padding. Used to make the matrix a nice power of two in size
98 * Other states are the same as the VMS DLM.
99 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
100 */
101
102 static const int __dlm_compat_matrix[8][8] = {
103 /* UN NL CR CW PR PW EX PD */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
105 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
106 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
107 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
108 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
109 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
110 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
111 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
112 };
113
114 /*
115 * This defines the direction of transfer of LVB data.
116 * Granted mode is the row; requested mode is the column.
117 * Usage: matrix[grmode+1][rqmode+1]
118 * 1 = LVB is returned to the caller
119 * 0 = LVB is written to the resource
120 * -1 = nothing happens to the LVB
121 */
122
123 const int dlm_lvb_operations[8][8] = {
124 /* UN NL CR CW PR PW EX PD*/
125 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
126 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
127 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
128 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
129 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
130 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
131 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
132 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
133 };
134
135 #define modes_compat(gr, rq) \
136 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
137
dlm_modes_compat(int mode1,int mode2)138 int dlm_modes_compat(int mode1, int mode2)
139 {
140 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
141 }
142
143 /*
144 * Compatibility matrix for conversions with QUECVT set.
145 * Granted mode is the row; requested mode is the column.
146 * Usage: matrix[grmode+1][rqmode+1]
147 */
148
149 static const int __quecvt_compat_matrix[8][8] = {
150 /* UN NL CR CW PR PW EX PD */
151 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
152 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
153 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
154 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
155 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
156 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
157 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
158 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
159 };
160
dlm_print_lkb(struct dlm_lkb * lkb)161 void dlm_print_lkb(struct dlm_lkb *lkb)
162 {
163 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
164 "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
165 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
166 dlm_iflags_val(lkb), lkb->lkb_status, lkb->lkb_rqmode,
167 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
168 (unsigned long long)lkb->lkb_recover_seq);
169 }
170
dlm_print_rsb(struct dlm_rsb * r)171 static void dlm_print_rsb(struct dlm_rsb *r)
172 {
173 printk(KERN_ERR "rsb: nodeid %d master %d dir %d flags %lx first %x "
174 "rlc %d name %s\n",
175 r->res_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
176 r->res_flags, r->res_first_lkid, r->res_recover_locks_count,
177 r->res_name);
178 }
179
dlm_dump_rsb(struct dlm_rsb * r)180 void dlm_dump_rsb(struct dlm_rsb *r)
181 {
182 struct dlm_lkb *lkb;
183
184 dlm_print_rsb(r);
185
186 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
187 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
188 printk(KERN_ERR "rsb lookup list\n");
189 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
190 dlm_print_lkb(lkb);
191 printk(KERN_ERR "rsb grant queue:\n");
192 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
193 dlm_print_lkb(lkb);
194 printk(KERN_ERR "rsb convert queue:\n");
195 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
196 dlm_print_lkb(lkb);
197 printk(KERN_ERR "rsb wait queue:\n");
198 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
199 dlm_print_lkb(lkb);
200 }
201
202 /* Threads cannot use the lockspace while it's being recovered */
203
dlm_lock_recovery(struct dlm_ls * ls)204 void dlm_lock_recovery(struct dlm_ls *ls)
205 {
206 down_read(&ls->ls_in_recovery);
207 }
208
dlm_unlock_recovery(struct dlm_ls * ls)209 void dlm_unlock_recovery(struct dlm_ls *ls)
210 {
211 up_read(&ls->ls_in_recovery);
212 }
213
dlm_lock_recovery_try(struct dlm_ls * ls)214 int dlm_lock_recovery_try(struct dlm_ls *ls)
215 {
216 return down_read_trylock(&ls->ls_in_recovery);
217 }
218
can_be_queued(struct dlm_lkb * lkb)219 static inline int can_be_queued(struct dlm_lkb *lkb)
220 {
221 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
222 }
223
force_blocking_asts(struct dlm_lkb * lkb)224 static inline int force_blocking_asts(struct dlm_lkb *lkb)
225 {
226 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
227 }
228
is_demoted(struct dlm_lkb * lkb)229 static inline int is_demoted(struct dlm_lkb *lkb)
230 {
231 return test_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
232 }
233
is_altmode(struct dlm_lkb * lkb)234 static inline int is_altmode(struct dlm_lkb *lkb)
235 {
236 return test_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
237 }
238
is_granted(struct dlm_lkb * lkb)239 static inline int is_granted(struct dlm_lkb *lkb)
240 {
241 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
242 }
243
is_remote(struct dlm_rsb * r)244 static inline int is_remote(struct dlm_rsb *r)
245 {
246 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
247 return !!r->res_nodeid;
248 }
249
is_process_copy(struct dlm_lkb * lkb)250 static inline int is_process_copy(struct dlm_lkb *lkb)
251 {
252 return lkb->lkb_nodeid &&
253 !test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
254 }
255
is_master_copy(struct dlm_lkb * lkb)256 static inline int is_master_copy(struct dlm_lkb *lkb)
257 {
258 return test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
259 }
260
middle_conversion(struct dlm_lkb * lkb)261 static inline int middle_conversion(struct dlm_lkb *lkb)
262 {
263 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
264 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
265 return 1;
266 return 0;
267 }
268
down_conversion(struct dlm_lkb * lkb)269 static inline int down_conversion(struct dlm_lkb *lkb)
270 {
271 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
272 }
273
is_overlap_unlock(struct dlm_lkb * lkb)274 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
275 {
276 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
277 }
278
is_overlap_cancel(struct dlm_lkb * lkb)279 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
280 {
281 return test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
282 }
283
is_overlap(struct dlm_lkb * lkb)284 static inline int is_overlap(struct dlm_lkb *lkb)
285 {
286 return test_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags) ||
287 test_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
288 }
289
queue_cast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)290 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
291 {
292 if (is_master_copy(lkb))
293 return;
294
295 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
296
297 if (rv == -DLM_ECANCEL &&
298 test_and_clear_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags))
299 rv = -EDEADLK;
300
301 dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, dlm_sbflags_val(lkb));
302 }
303
queue_cast_overlap(struct dlm_rsb * r,struct dlm_lkb * lkb)304 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
305 {
306 queue_cast(r, lkb,
307 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
308 }
309
queue_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int rqmode)310 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
311 {
312 if (is_master_copy(lkb)) {
313 send_bast(r, lkb, rqmode);
314 } else {
315 dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
316 }
317 }
318
319 /*
320 * Basic operations on rsb's and lkb's
321 */
322
rsb_toss_jiffies(void)323 static inline unsigned long rsb_toss_jiffies(void)
324 {
325 return jiffies + (READ_ONCE(dlm_config.ci_toss_secs) * HZ);
326 }
327
328 /* This is only called to add a reference when the code already holds
329 a valid reference to the rsb, so there's no need for locking. */
330
hold_rsb(struct dlm_rsb * r)331 static inline void hold_rsb(struct dlm_rsb *r)
332 {
333 /* inactive rsbs are not ref counted */
334 WARN_ON(rsb_flag(r, RSB_INACTIVE));
335 kref_get(&r->res_ref);
336 }
337
dlm_hold_rsb(struct dlm_rsb * r)338 void dlm_hold_rsb(struct dlm_rsb *r)
339 {
340 hold_rsb(r);
341 }
342
343 /* TODO move this to lib/refcount.c */
344 static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t * r,rwlock_t * lock)345 dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
346 __cond_acquires(true, lock)
347 {
348 if (refcount_dec_not_one(r))
349 return false;
350
351 write_lock_bh(lock);
352 if (!refcount_dec_and_test(r)) {
353 write_unlock_bh(lock);
354 return false;
355 }
356
357 return true;
358 }
359
360 /* TODO move this to include/linux/kref.h */
dlm_kref_put_write_lock_bh(struct kref * kref,void (* release)(struct kref * kref),rwlock_t * lock)361 static inline int dlm_kref_put_write_lock_bh(struct kref *kref,
362 void (*release)(struct kref *kref),
363 rwlock_t *lock)
364 {
365 if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
366 release(kref);
367 return 1;
368 }
369
370 return 0;
371 }
372
put_rsb(struct dlm_rsb * r)373 static void put_rsb(struct dlm_rsb *r)
374 {
375 struct dlm_ls *ls = r->res_ls;
376 int rv;
377
378 rv = dlm_kref_put_write_lock_bh(&r->res_ref, deactivate_rsb,
379 &ls->ls_rsbtbl_lock);
380 if (rv)
381 write_unlock_bh(&ls->ls_rsbtbl_lock);
382 }
383
dlm_put_rsb(struct dlm_rsb * r)384 void dlm_put_rsb(struct dlm_rsb *r)
385 {
386 put_rsb(r);
387 }
388
389 /* connected with timer_delete_sync() in dlm_ls_stop() to stop
390 * new timers when recovery is triggered and don't run them
391 * again until a resume_scan_timer() tries it again.
392 */
enable_scan_timer(struct dlm_ls * ls,unsigned long jiffies)393 static void enable_scan_timer(struct dlm_ls *ls, unsigned long jiffies)
394 {
395 if (!dlm_locking_stopped(ls))
396 mod_timer(&ls->ls_scan_timer, jiffies);
397 }
398
399 /* This function tries to resume the timer callback if a rsb
400 * is on the scan list and no timer is pending. It might that
401 * the first entry is on currently executed as timer callback
402 * but we don't care if a timer queued up again and does
403 * nothing. Should be a rare case.
404 */
resume_scan_timer(struct dlm_ls * ls)405 void resume_scan_timer(struct dlm_ls *ls)
406 {
407 struct dlm_rsb *r;
408
409 spin_lock_bh(&ls->ls_scan_lock);
410 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
411 res_scan_list);
412 if (r && !timer_pending(&ls->ls_scan_timer))
413 enable_scan_timer(ls, r->res_toss_time);
414 spin_unlock_bh(&ls->ls_scan_lock);
415 }
416
417 /* ls_rsbtbl_lock must be held */
418
del_scan(struct dlm_ls * ls,struct dlm_rsb * r)419 static void del_scan(struct dlm_ls *ls, struct dlm_rsb *r)
420 {
421 struct dlm_rsb *first;
422
423 /* active rsbs should never be on the scan list */
424 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
425
426 spin_lock_bh(&ls->ls_scan_lock);
427 r->res_toss_time = 0;
428
429 /* if the rsb is not queued do nothing */
430 if (list_empty(&r->res_scan_list))
431 goto out;
432
433 /* get the first element before delete */
434 first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
435 res_scan_list);
436 list_del_init(&r->res_scan_list);
437 /* check if the first element was the rsb we deleted */
438 if (first == r) {
439 /* try to get the new first element, if the list
440 * is empty now try to delete the timer, if we are
441 * too late we don't care.
442 *
443 * if the list isn't empty and a new first element got
444 * in place, set the new timer expire time.
445 */
446 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
447 res_scan_list);
448 if (!first)
449 timer_delete(&ls->ls_scan_timer);
450 else
451 enable_scan_timer(ls, first->res_toss_time);
452 }
453
454 out:
455 spin_unlock_bh(&ls->ls_scan_lock);
456 }
457
add_scan(struct dlm_ls * ls,struct dlm_rsb * r)458 static void add_scan(struct dlm_ls *ls, struct dlm_rsb *r)
459 {
460 int our_nodeid = dlm_our_nodeid();
461 struct dlm_rsb *first;
462
463 /* A dir record for a remote master rsb should never be on the scan list. */
464 WARN_ON(!dlm_no_directory(ls) &&
465 (r->res_master_nodeid != our_nodeid) &&
466 (dlm_dir_nodeid(r) == our_nodeid));
467
468 /* An active rsb should never be on the scan list. */
469 WARN_ON(!rsb_flag(r, RSB_INACTIVE));
470
471 /* An rsb should not already be on the scan list. */
472 WARN_ON(!list_empty(&r->res_scan_list));
473
474 spin_lock_bh(&ls->ls_scan_lock);
475 /* set the new rsb absolute expire time in the rsb */
476 r->res_toss_time = rsb_toss_jiffies();
477 if (list_empty(&ls->ls_scan_list)) {
478 /* if the queue is empty add the element and it's
479 * our new expire time
480 */
481 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
482 enable_scan_timer(ls, r->res_toss_time);
483 } else {
484 /* try to get the maybe new first element and then add
485 * to this rsb with the oldest expire time to the end
486 * of the queue. If the list was empty before this
487 * rsb expire time is our next expiration if it wasn't
488 * the now new first elemet is our new expiration time
489 */
490 first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
491 res_scan_list);
492 list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
493 if (!first)
494 enable_scan_timer(ls, r->res_toss_time);
495 else
496 enable_scan_timer(ls, first->res_toss_time);
497 }
498 spin_unlock_bh(&ls->ls_scan_lock);
499 }
500
501 /* if we hit contention we do in 250 ms a retry to trylock.
502 * if there is any other mod_timer in between we don't care
503 * about that it expires earlier again this is only for the
504 * unlikely case nothing happened in this time.
505 */
506 #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
507
508 /* Called by lockspace scan_timer to free unused rsb's. */
509
dlm_rsb_scan(struct timer_list * timer)510 void dlm_rsb_scan(struct timer_list *timer)
511 {
512 struct dlm_ls *ls = timer_container_of(ls, timer, ls_scan_timer);
513 int our_nodeid = dlm_our_nodeid();
514 struct dlm_rsb *r;
515 int rv;
516
517 while (1) {
518 /* interrupting point to leave iteration when
519 * recovery waits for timer_delete_sync(), recovery
520 * will take care to delete everything in scan list.
521 */
522 if (dlm_locking_stopped(ls))
523 break;
524
525 rv = spin_trylock(&ls->ls_scan_lock);
526 if (!rv) {
527 /* rearm again try timer */
528 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
529 break;
530 }
531
532 r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
533 res_scan_list);
534 if (!r) {
535 /* the next add_scan will enable the timer again */
536 spin_unlock(&ls->ls_scan_lock);
537 break;
538 }
539
540 /*
541 * If the first rsb is not yet expired, then stop because the
542 * list is sorted with nearest expiration first.
543 */
544 if (time_before(jiffies, r->res_toss_time)) {
545 /* rearm with the next rsb to expire in the future */
546 enable_scan_timer(ls, r->res_toss_time);
547 spin_unlock(&ls->ls_scan_lock);
548 break;
549 }
550
551 /* in find_rsb_dir/nodir there is a reverse order of this
552 * lock, however this is only a trylock if we hit some
553 * possible contention we try it again.
554 */
555 rv = write_trylock(&ls->ls_rsbtbl_lock);
556 if (!rv) {
557 spin_unlock(&ls->ls_scan_lock);
558 /* rearm again try timer */
559 enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY);
560 break;
561 }
562
563 list_del(&r->res_slow_list);
564 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
565 dlm_rhash_rsb_params);
566 rsb_clear_flag(r, RSB_HASHED);
567
568 /* ls_rsbtbl_lock is not needed when calling send_remove() */
569 write_unlock(&ls->ls_rsbtbl_lock);
570
571 list_del_init(&r->res_scan_list);
572 spin_unlock(&ls->ls_scan_lock);
573
574 /* An rsb that is a dir record for a remote master rsb
575 * cannot be removed, and should not have a timer enabled.
576 */
577 WARN_ON(!dlm_no_directory(ls) &&
578 (r->res_master_nodeid != our_nodeid) &&
579 (dlm_dir_nodeid(r) == our_nodeid));
580
581 /* We're the master of this rsb but we're not
582 * the directory record, so we need to tell the
583 * dir node to remove the dir record
584 */
585 if (!dlm_no_directory(ls) &&
586 (r->res_master_nodeid == our_nodeid) &&
587 (dlm_dir_nodeid(r) != our_nodeid))
588 send_remove(r);
589
590 free_inactive_rsb(r);
591 }
592 }
593
594 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
595 unlock any spinlocks, go back and call pre_rsb_struct again.
596 Otherwise, take an rsb off the list and return it. */
597
get_rsb_struct(struct dlm_ls * ls,const void * name,int len,struct dlm_rsb ** r_ret)598 static int get_rsb_struct(struct dlm_ls *ls, const void *name, int len,
599 struct dlm_rsb **r_ret)
600 {
601 struct dlm_rsb *r;
602
603 r = dlm_allocate_rsb();
604 if (!r)
605 return -ENOMEM;
606
607 r->res_ls = ls;
608 r->res_length = len;
609 memcpy(r->res_name, name, len);
610 spin_lock_init(&r->res_lock);
611
612 INIT_LIST_HEAD(&r->res_lookup);
613 INIT_LIST_HEAD(&r->res_grantqueue);
614 INIT_LIST_HEAD(&r->res_convertqueue);
615 INIT_LIST_HEAD(&r->res_waitqueue);
616 INIT_LIST_HEAD(&r->res_root_list);
617 INIT_LIST_HEAD(&r->res_scan_list);
618 INIT_LIST_HEAD(&r->res_recover_list);
619 INIT_LIST_HEAD(&r->res_masters_list);
620
621 *r_ret = r;
622 return 0;
623 }
624
dlm_search_rsb_tree(struct rhashtable * rhash,const void * name,int len,struct dlm_rsb ** r_ret)625 int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
626 struct dlm_rsb **r_ret)
627 {
628 char key[DLM_RESNAME_MAXLEN] = {};
629 if (len > DLM_RESNAME_MAXLEN)
630 return -EINVAL;
631 memcpy(key, name, len);
632 *r_ret = rhashtable_lookup_fast(rhash, &key, dlm_rhash_rsb_params);
633 if (*r_ret)
634 return 0;
635
636 return -EBADR;
637 }
638
rsb_insert(struct dlm_rsb * rsb,struct rhashtable * rhash)639 static int rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
640 {
641 int rv;
642
643 rv = rhashtable_insert_fast(rhash, &rsb->res_node,
644 dlm_rhash_rsb_params);
645 if (!rv)
646 rsb_set_flag(rsb, RSB_HASHED);
647
648 return rv;
649 }
650
651 /*
652 * Find rsb in rsbtbl and potentially create/add one
653 *
654 * Delaying the release of rsb's has a similar benefit to applications keeping
655 * NL locks on an rsb, but without the guarantee that the cached master value
656 * will still be valid when the rsb is reused. Apps aren't always smart enough
657 * to keep NL locks on an rsb that they may lock again shortly; this can lead
658 * to excessive master lookups and removals if we don't delay the release.
659 *
660 * Searching for an rsb means looking through both the normal list and toss
661 * list. When found on the toss list the rsb is moved to the normal list with
662 * ref count of 1; when found on normal list the ref count is incremented.
663 *
664 * rsb's on the keep list are being used locally and refcounted.
665 * rsb's on the toss list are not being used locally, and are not refcounted.
666 *
667 * The toss list rsb's were either
668 * - previously used locally but not any more (were on keep list, then
669 * moved to toss list when last refcount dropped)
670 * - created and put on toss list as a directory record for a lookup
671 * (we are the dir node for the res, but are not using the res right now,
672 * but some other node is)
673 *
674 * The purpose of find_rsb() is to return a refcounted rsb for local use.
675 * So, if the given rsb is on the toss list, it is moved to the keep list
676 * before being returned.
677 *
678 * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no
679 * more refcounts exist, so the rsb is moved from the keep list to the
680 * toss list.
681 *
682 * rsb's on both keep and toss lists are used for doing a name to master
683 * lookups. rsb's that are in use locally (and being refcounted) are on
684 * the keep list, rsb's that are not in use locally (not refcounted) and
685 * only exist for name/master lookups are on the toss list.
686 *
687 * rsb's on the toss list who's dir_nodeid is not local can have stale
688 * name/master mappings. So, remote requests on such rsb's can potentially
689 * return with an error, which means the mapping is stale and needs to
690 * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and
691 * first_lkid is to keep only a single outstanding request on an rsb
692 * while that rsb has a potentially stale master.)
693 */
694
find_rsb_dir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)695 static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
696 uint32_t hash, int dir_nodeid, int from_nodeid,
697 unsigned int flags, struct dlm_rsb **r_ret)
698 {
699 struct dlm_rsb *r = NULL;
700 int our_nodeid = dlm_our_nodeid();
701 int from_local = 0;
702 int from_other = 0;
703 int from_dir = 0;
704 int create = 0;
705 int error;
706
707 if (flags & R_RECEIVE_REQUEST) {
708 if (from_nodeid == dir_nodeid)
709 from_dir = 1;
710 else
711 from_other = 1;
712 } else if (flags & R_REQUEST) {
713 from_local = 1;
714 }
715
716 /*
717 * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so
718 * from_nodeid has sent us a lock in dlm_recover_locks, believing
719 * we're the new master. Our local recovery may not have set
720 * res_master_nodeid to our_nodeid yet, so allow either. Don't
721 * create the rsb; dlm_recover_process_copy() will handle EBADR
722 * by resending.
723 *
724 * If someone sends us a request, we are the dir node, and we do
725 * not find the rsb anywhere, then recreate it. This happens if
726 * someone sends us a request after we have removed/freed an rsb.
727 * (They sent a request instead of lookup because they are using
728 * an rsb taken from their scan list.)
729 */
730
731 if (from_local || from_dir ||
732 (from_other && (dir_nodeid == our_nodeid))) {
733 create = 1;
734 }
735
736 retry:
737 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
738 if (error)
739 goto do_new;
740
741 /* check if the rsb is active under read lock - likely path */
742 read_lock_bh(&ls->ls_rsbtbl_lock);
743 if (!rsb_flag(r, RSB_HASHED)) {
744 read_unlock_bh(&ls->ls_rsbtbl_lock);
745 error = -EBADR;
746 goto do_new;
747 }
748
749 /*
750 * rsb is active, so we can't check master_nodeid without lock_rsb.
751 */
752
753 if (rsb_flag(r, RSB_INACTIVE)) {
754 read_unlock_bh(&ls->ls_rsbtbl_lock);
755 goto do_inactive;
756 }
757
758 kref_get(&r->res_ref);
759 read_unlock_bh(&ls->ls_rsbtbl_lock);
760 goto out;
761
762
763 do_inactive:
764 write_lock_bh(&ls->ls_rsbtbl_lock);
765
766 /*
767 * The expectation here is that the rsb will have HASHED and
768 * INACTIVE flags set, and that the rsb can be moved from
769 * inactive back to active again. However, between releasing
770 * the read lock and acquiring the write lock, this rsb could
771 * have been removed from rsbtbl, and had HASHED cleared, to
772 * be freed. To deal with this case, we would normally need
773 * to repeat dlm_search_rsb_tree while holding the write lock,
774 * but rcu allows us to simply check the HASHED flag, because
775 * the rcu read lock means the rsb will not be freed yet.
776 * If the HASHED flag is not set, then the rsb is being freed,
777 * so we add a new rsb struct. If the HASHED flag is set,
778 * and INACTIVE is not set, it means another thread has
779 * made the rsb active, as we're expecting to do here, and
780 * we just repeat the lookup (this will be very unlikely.)
781 */
782 if (rsb_flag(r, RSB_HASHED)) {
783 if (!rsb_flag(r, RSB_INACTIVE)) {
784 write_unlock_bh(&ls->ls_rsbtbl_lock);
785 goto retry;
786 }
787 } else {
788 write_unlock_bh(&ls->ls_rsbtbl_lock);
789 error = -EBADR;
790 goto do_new;
791 }
792
793 /*
794 * rsb found inactive (master_nodeid may be out of date unless
795 * we are the dir_nodeid or were the master) No other thread
796 * is using this rsb because it's inactive, so we can
797 * look at or update res_master_nodeid without lock_rsb.
798 */
799
800 if ((r->res_master_nodeid != our_nodeid) && from_other) {
801 /* our rsb was not master, and another node (not the dir node)
802 has sent us a request */
803 log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
804 from_nodeid, r->res_master_nodeid, dir_nodeid,
805 r->res_name);
806 write_unlock_bh(&ls->ls_rsbtbl_lock);
807 error = -ENOTBLK;
808 goto out;
809 }
810
811 if ((r->res_master_nodeid != our_nodeid) && from_dir) {
812 /* don't think this should ever happen */
813 log_error(ls, "find_rsb inactive from_dir %d master %d",
814 from_nodeid, r->res_master_nodeid);
815 dlm_print_rsb(r);
816 /* fix it and go on */
817 r->res_master_nodeid = our_nodeid;
818 r->res_nodeid = 0;
819 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
820 r->res_first_lkid = 0;
821 }
822
823 if (from_local && (r->res_master_nodeid != our_nodeid)) {
824 /* Because we have held no locks on this rsb,
825 res_master_nodeid could have become stale. */
826 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
827 r->res_first_lkid = 0;
828 }
829
830 /* we always deactivate scan timer for the rsb, when
831 * we move it out of the inactive state as rsb state
832 * can be changed and scan timers are only for inactive
833 * rsbs.
834 */
835 del_scan(ls, r);
836 list_move(&r->res_slow_list, &ls->ls_slow_active);
837 rsb_clear_flag(r, RSB_INACTIVE);
838 kref_init(&r->res_ref); /* ref is now used in active state */
839 write_unlock_bh(&ls->ls_rsbtbl_lock);
840
841 goto out;
842
843
844 do_new:
845 /*
846 * rsb not found
847 */
848
849 if (error == -EBADR && !create)
850 goto out;
851
852 error = get_rsb_struct(ls, name, len, &r);
853 if (WARN_ON_ONCE(error))
854 goto out;
855
856 r->res_hash = hash;
857 r->res_dir_nodeid = dir_nodeid;
858 kref_init(&r->res_ref);
859
860 if (from_dir) {
861 /* want to see how often this happens */
862 log_debug(ls, "find_rsb new from_dir %d recreate %s",
863 from_nodeid, r->res_name);
864 r->res_master_nodeid = our_nodeid;
865 r->res_nodeid = 0;
866 goto out_add;
867 }
868
869 if (from_other && (dir_nodeid != our_nodeid)) {
870 /* should never happen */
871 log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
872 from_nodeid, dir_nodeid, our_nodeid, r->res_name);
873 dlm_free_rsb(r);
874 r = NULL;
875 error = -ENOTBLK;
876 goto out;
877 }
878
879 if (from_other) {
880 log_debug(ls, "find_rsb new from_other %d dir %d %s",
881 from_nodeid, dir_nodeid, r->res_name);
882 }
883
884 if (dir_nodeid == our_nodeid) {
885 /* When we are the dir nodeid, we can set the master
886 node immediately */
887 r->res_master_nodeid = our_nodeid;
888 r->res_nodeid = 0;
889 } else {
890 /* set_master will send_lookup to dir_nodeid */
891 r->res_master_nodeid = 0;
892 r->res_nodeid = -1;
893 }
894
895 out_add:
896
897 write_lock_bh(&ls->ls_rsbtbl_lock);
898 error = rsb_insert(r, &ls->ls_rsbtbl);
899 if (error == -EEXIST) {
900 /* somebody else was faster and it seems the
901 * rsb exists now, we do a whole relookup
902 */
903 write_unlock_bh(&ls->ls_rsbtbl_lock);
904 dlm_free_rsb(r);
905 goto retry;
906 } else if (!error) {
907 list_add(&r->res_slow_list, &ls->ls_slow_active);
908 }
909 write_unlock_bh(&ls->ls_rsbtbl_lock);
910 out:
911 *r_ret = r;
912 return error;
913 }
914
915 /* During recovery, other nodes can send us new MSTCPY locks (from
916 dlm_recover_locks) before we've made ourself master (in
917 dlm_recover_masters). */
918
find_rsb_nodir(struct dlm_ls * ls,const void * name,int len,uint32_t hash,int dir_nodeid,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)919 static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
920 uint32_t hash, int dir_nodeid, int from_nodeid,
921 unsigned int flags, struct dlm_rsb **r_ret)
922 {
923 struct dlm_rsb *r = NULL;
924 int our_nodeid = dlm_our_nodeid();
925 int recover = (flags & R_RECEIVE_RECOVER);
926 int error;
927
928 retry:
929 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
930 if (error)
931 goto do_new;
932
933 /* check if the rsb is in active state under read lock - likely path */
934 read_lock_bh(&ls->ls_rsbtbl_lock);
935 if (!rsb_flag(r, RSB_HASHED)) {
936 read_unlock_bh(&ls->ls_rsbtbl_lock);
937 goto do_new;
938 }
939
940 if (rsb_flag(r, RSB_INACTIVE)) {
941 read_unlock_bh(&ls->ls_rsbtbl_lock);
942 goto do_inactive;
943 }
944
945 /*
946 * rsb is active, so we can't check master_nodeid without lock_rsb.
947 */
948
949 kref_get(&r->res_ref);
950 read_unlock_bh(&ls->ls_rsbtbl_lock);
951
952 goto out;
953
954
955 do_inactive:
956 write_lock_bh(&ls->ls_rsbtbl_lock);
957
958 /* See comment in find_rsb_dir. */
959 if (rsb_flag(r, RSB_HASHED)) {
960 if (!rsb_flag(r, RSB_INACTIVE)) {
961 write_unlock_bh(&ls->ls_rsbtbl_lock);
962 goto retry;
963 }
964 } else {
965 write_unlock_bh(&ls->ls_rsbtbl_lock);
966 goto do_new;
967 }
968
969
970 /*
971 * rsb found inactive. No other thread is using this rsb because
972 * it's inactive, so we can look at or update res_master_nodeid
973 * without lock_rsb.
974 */
975
976 if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) {
977 /* our rsb is not master, and another node has sent us a
978 request; this should never happen */
979 log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
980 from_nodeid, r->res_master_nodeid, dir_nodeid);
981 dlm_print_rsb(r);
982 write_unlock_bh(&ls->ls_rsbtbl_lock);
983 error = -ENOTBLK;
984 goto out;
985 }
986
987 if (!recover && (r->res_master_nodeid != our_nodeid) &&
988 (dir_nodeid == our_nodeid)) {
989 /* our rsb is not master, and we are dir; may as well fix it;
990 this should never happen */
991 log_error(ls, "find_rsb inactive our %d master %d dir %d",
992 our_nodeid, r->res_master_nodeid, dir_nodeid);
993 dlm_print_rsb(r);
994 r->res_master_nodeid = our_nodeid;
995 r->res_nodeid = 0;
996 }
997
998 del_scan(ls, r);
999 list_move(&r->res_slow_list, &ls->ls_slow_active);
1000 rsb_clear_flag(r, RSB_INACTIVE);
1001 kref_init(&r->res_ref);
1002 write_unlock_bh(&ls->ls_rsbtbl_lock);
1003
1004 goto out;
1005
1006
1007 do_new:
1008 /*
1009 * rsb not found
1010 */
1011
1012 error = get_rsb_struct(ls, name, len, &r);
1013 if (WARN_ON_ONCE(error))
1014 goto out;
1015
1016 r->res_hash = hash;
1017 r->res_dir_nodeid = dir_nodeid;
1018 r->res_master_nodeid = dir_nodeid;
1019 r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
1020 kref_init(&r->res_ref);
1021
1022 write_lock_bh(&ls->ls_rsbtbl_lock);
1023 error = rsb_insert(r, &ls->ls_rsbtbl);
1024 if (error == -EEXIST) {
1025 /* somebody else was faster and it seems the
1026 * rsb exists now, we do a whole relookup
1027 */
1028 write_unlock_bh(&ls->ls_rsbtbl_lock);
1029 dlm_free_rsb(r);
1030 goto retry;
1031 } else if (!error) {
1032 list_add(&r->res_slow_list, &ls->ls_slow_active);
1033 }
1034 write_unlock_bh(&ls->ls_rsbtbl_lock);
1035
1036 out:
1037 *r_ret = r;
1038 return error;
1039 }
1040
1041 /*
1042 * rsb rcu usage
1043 *
1044 * While rcu read lock is held, the rsb cannot be freed,
1045 * which allows a lookup optimization.
1046 *
1047 * Two threads are accessing the same rsb concurrently,
1048 * the first (A) is trying to use the rsb, the second (B)
1049 * is trying to free the rsb.
1050 *
1051 * thread A thread B
1052 * (trying to use rsb) (trying to free rsb)
1053 *
1054 * A1. rcu read lock
1055 * A2. rsbtbl read lock
1056 * A3. look up rsb in rsbtbl
1057 * A4. rsbtbl read unlock
1058 * B1. rsbtbl write lock
1059 * B2. look up rsb in rsbtbl
1060 * B3. remove rsb from rsbtbl
1061 * B4. clear rsb HASHED flag
1062 * B5. rsbtbl write unlock
1063 * B6. begin freeing rsb using rcu...
1064 *
1065 * (rsb is inactive, so try to make it active again)
1066 * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1067 * A6. the rsb HASHED flag is not set, which it means the rsb
1068 * is being removed from rsbtbl and freed, so don't use it.
1069 * A7. rcu read unlock
1070 *
1071 * B7. ...finish freeing rsb using rcu
1072 * A8. create a new rsb
1073 *
1074 * Without the rcu optimization, steps A5-8 would need to do
1075 * an extra rsbtbl lookup:
1076 * A5. rsbtbl write lock
1077 * A6. look up rsb in rsbtbl, not found
1078 * A7. rsbtbl write unlock
1079 * A8. create a new rsb
1080 */
1081
find_rsb(struct dlm_ls * ls,const void * name,int len,int from_nodeid,unsigned int flags,struct dlm_rsb ** r_ret)1082 static int find_rsb(struct dlm_ls *ls, const void *name, int len,
1083 int from_nodeid, unsigned int flags,
1084 struct dlm_rsb **r_ret)
1085 {
1086 int dir_nodeid;
1087 uint32_t hash;
1088 int rv;
1089
1090 if (len > DLM_RESNAME_MAXLEN)
1091 return -EINVAL;
1092
1093 hash = jhash(name, len, 0);
1094 dir_nodeid = dlm_hash2nodeid(ls, hash);
1095
1096 rcu_read_lock();
1097 if (dlm_no_directory(ls))
1098 rv = find_rsb_nodir(ls, name, len, hash, dir_nodeid,
1099 from_nodeid, flags, r_ret);
1100 else
1101 rv = find_rsb_dir(ls, name, len, hash, dir_nodeid,
1102 from_nodeid, flags, r_ret);
1103 rcu_read_unlock();
1104 return rv;
1105 }
1106
1107 /* we have received a request and found that res_master_nodeid != our_nodeid,
1108 so we need to return an error or make ourself the master */
1109
validate_master_nodeid(struct dlm_ls * ls,struct dlm_rsb * r,int from_nodeid)1110 static int validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r,
1111 int from_nodeid)
1112 {
1113 if (dlm_no_directory(ls)) {
1114 log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
1115 from_nodeid, r->res_master_nodeid,
1116 r->res_dir_nodeid);
1117 dlm_print_rsb(r);
1118 return -ENOTBLK;
1119 }
1120
1121 if (from_nodeid != r->res_dir_nodeid) {
1122 /* our rsb is not master, and another node (not the dir node)
1123 has sent us a request. this is much more common when our
1124 master_nodeid is zero, so limit debug to non-zero. */
1125
1126 if (r->res_master_nodeid) {
1127 log_debug(ls, "validate master from_other %d master %d "
1128 "dir %d first %x %s", from_nodeid,
1129 r->res_master_nodeid, r->res_dir_nodeid,
1130 r->res_first_lkid, r->res_name);
1131 }
1132 return -ENOTBLK;
1133 } else {
1134 /* our rsb is not master, but the dir nodeid has sent us a
1135 request; this could happen with master 0 / res_nodeid -1 */
1136
1137 if (r->res_master_nodeid) {
1138 log_error(ls, "validate master from_dir %d master %d "
1139 "first %x %s",
1140 from_nodeid, r->res_master_nodeid,
1141 r->res_first_lkid, r->res_name);
1142 }
1143
1144 r->res_master_nodeid = dlm_our_nodeid();
1145 r->res_nodeid = 0;
1146 return 0;
1147 }
1148 }
1149
__dlm_master_lookup(struct dlm_ls * ls,struct dlm_rsb * r,int our_nodeid,int from_nodeid,bool is_inactive,unsigned int flags,int * r_nodeid,int * result)1150 static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid,
1151 int from_nodeid, bool is_inactive, unsigned int flags,
1152 int *r_nodeid, int *result)
1153 {
1154 int fix_master = (flags & DLM_LU_RECOVER_MASTER);
1155 int from_master = (flags & DLM_LU_RECOVER_DIR);
1156
1157 if (r->res_dir_nodeid != our_nodeid) {
1158 /* should not happen, but may as well fix it and carry on */
1159 log_error(ls, "%s res_dir %d our %d %s", __func__,
1160 r->res_dir_nodeid, our_nodeid, r->res_name);
1161 r->res_dir_nodeid = our_nodeid;
1162 }
1163
1164 if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) {
1165 /* Recovery uses this function to set a new master when
1166 * the previous master failed. Setting NEW_MASTER will
1167 * force dlm_recover_masters to call recover_master on this
1168 * rsb even though the res_nodeid is no longer removed.
1169 */
1170
1171 r->res_master_nodeid = from_nodeid;
1172 r->res_nodeid = from_nodeid;
1173 rsb_set_flag(r, RSB_NEW_MASTER);
1174
1175 if (is_inactive) {
1176 /* I don't think we should ever find it inactive. */
1177 log_error(ls, "%s fix_master inactive", __func__);
1178 dlm_dump_rsb(r);
1179 }
1180 }
1181
1182 if (from_master && (r->res_master_nodeid != from_nodeid)) {
1183 /* this will happen if from_nodeid became master during
1184 * a previous recovery cycle, and we aborted the previous
1185 * cycle before recovering this master value
1186 */
1187
1188 log_limit(ls, "%s from_master %d master_nodeid %d res_nodeid %d first %x %s",
1189 __func__, from_nodeid, r->res_master_nodeid,
1190 r->res_nodeid, r->res_first_lkid, r->res_name);
1191
1192 if (r->res_master_nodeid == our_nodeid) {
1193 log_error(ls, "from_master %d our_master", from_nodeid);
1194 dlm_dump_rsb(r);
1195 goto ret_assign;
1196 }
1197
1198 r->res_master_nodeid = from_nodeid;
1199 r->res_nodeid = from_nodeid;
1200 rsb_set_flag(r, RSB_NEW_MASTER);
1201 }
1202
1203 if (!r->res_master_nodeid) {
1204 /* this will happen if recovery happens while we're looking
1205 * up the master for this rsb
1206 */
1207
1208 log_debug(ls, "%s master 0 to %d first %x %s", __func__,
1209 from_nodeid, r->res_first_lkid, r->res_name);
1210 r->res_master_nodeid = from_nodeid;
1211 r->res_nodeid = from_nodeid;
1212 }
1213
1214 if (!from_master && !fix_master &&
1215 (r->res_master_nodeid == from_nodeid)) {
1216 /* this can happen when the master sends remove, the dir node
1217 * finds the rsb on the active list and ignores the remove,
1218 * and the former master sends a lookup
1219 */
1220
1221 log_limit(ls, "%s from master %d flags %x first %x %s",
1222 __func__, from_nodeid, flags, r->res_first_lkid,
1223 r->res_name);
1224 }
1225
1226 ret_assign:
1227 *r_nodeid = r->res_master_nodeid;
1228 if (result)
1229 *result = DLM_LU_MATCH;
1230 }
1231
1232 /*
1233 * We're the dir node for this res and another node wants to know the
1234 * master nodeid. During normal operation (non recovery) this is only
1235 * called from receive_lookup(); master lookups when the local node is
1236 * the dir node are done by find_rsb().
1237 *
1238 * normal operation, we are the dir node for a resource
1239 * . _request_lock
1240 * . set_master
1241 * . send_lookup
1242 * . receive_lookup
1243 * . dlm_master_lookup flags 0
1244 *
1245 * recover directory, we are rebuilding dir for all resources
1246 * . dlm_recover_directory
1247 * . dlm_rcom_names
1248 * remote node sends back the rsb names it is master of and we are dir of
1249 * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1)
1250 * we either create new rsb setting remote node as master, or find existing
1251 * rsb and set master to be the remote node.
1252 *
1253 * recover masters, we are finding the new master for resources
1254 * . dlm_recover_masters
1255 * . recover_master
1256 * . dlm_send_rcom_lookup
1257 * . receive_rcom_lookup
1258 * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1259 */
1260
_dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1261 static int _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1262 int len, unsigned int flags, int *r_nodeid, int *result)
1263 {
1264 struct dlm_rsb *r = NULL;
1265 uint32_t hash;
1266 int our_nodeid = dlm_our_nodeid();
1267 int dir_nodeid, error;
1268
1269 if (len > DLM_RESNAME_MAXLEN)
1270 return -EINVAL;
1271
1272 if (from_nodeid == our_nodeid) {
1273 log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
1274 our_nodeid, flags);
1275 return -EINVAL;
1276 }
1277
1278 hash = jhash(name, len, 0);
1279 dir_nodeid = dlm_hash2nodeid(ls, hash);
1280 if (dir_nodeid != our_nodeid) {
1281 log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
1282 from_nodeid, dir_nodeid, our_nodeid, hash,
1283 ls->ls_num_nodes);
1284 *r_nodeid = -1;
1285 return -EINVAL;
1286 }
1287
1288 retry:
1289 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1290 if (error)
1291 goto not_found;
1292
1293 /* check if the rsb is active under read lock - likely path */
1294 read_lock_bh(&ls->ls_rsbtbl_lock);
1295 if (!rsb_flag(r, RSB_HASHED)) {
1296 read_unlock_bh(&ls->ls_rsbtbl_lock);
1297 goto not_found;
1298 }
1299
1300 if (rsb_flag(r, RSB_INACTIVE)) {
1301 read_unlock_bh(&ls->ls_rsbtbl_lock);
1302 goto do_inactive;
1303 }
1304
1305 /* because the rsb is active, we need to lock_rsb before
1306 * checking/changing re_master_nodeid
1307 */
1308
1309 hold_rsb(r);
1310 read_unlock_bh(&ls->ls_rsbtbl_lock);
1311 lock_rsb(r);
1312
1313 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, false,
1314 flags, r_nodeid, result);
1315
1316 /* the rsb was active */
1317 unlock_rsb(r);
1318 put_rsb(r);
1319
1320 return 0;
1321
1322 do_inactive:
1323 /* unlikely path - check if still part of ls_rsbtbl */
1324 write_lock_bh(&ls->ls_rsbtbl_lock);
1325
1326 /* see comment in find_rsb_dir */
1327 if (rsb_flag(r, RSB_HASHED)) {
1328 if (!rsb_flag(r, RSB_INACTIVE)) {
1329 write_unlock_bh(&ls->ls_rsbtbl_lock);
1330 /* something as changed, very unlikely but
1331 * try again
1332 */
1333 goto retry;
1334 }
1335 } else {
1336 write_unlock_bh(&ls->ls_rsbtbl_lock);
1337 goto not_found;
1338 }
1339
1340 /* because the rsb is inactive, it's not refcounted and lock_rsb
1341 is not used, but is protected by the rsbtbl lock */
1342
1343 __dlm_master_lookup(ls, r, our_nodeid, from_nodeid, true, flags,
1344 r_nodeid, result);
1345
1346 /* A dir record rsb should never be on scan list.
1347 * Except when we are the dir and master node.
1348 * This function should only be called by the dir
1349 * node.
1350 */
1351 WARN_ON(!list_empty(&r->res_scan_list) &&
1352 r->res_master_nodeid != our_nodeid);
1353
1354 write_unlock_bh(&ls->ls_rsbtbl_lock);
1355
1356 return 0;
1357
1358 not_found:
1359 error = get_rsb_struct(ls, name, len, &r);
1360 if (WARN_ON_ONCE(error))
1361 goto out;
1362
1363 r->res_hash = hash;
1364 r->res_dir_nodeid = our_nodeid;
1365 r->res_master_nodeid = from_nodeid;
1366 r->res_nodeid = from_nodeid;
1367 rsb_set_flag(r, RSB_INACTIVE);
1368
1369 write_lock_bh(&ls->ls_rsbtbl_lock);
1370 error = rsb_insert(r, &ls->ls_rsbtbl);
1371 if (error == -EEXIST) {
1372 /* somebody else was faster and it seems the
1373 * rsb exists now, we do a whole relookup
1374 */
1375 write_unlock_bh(&ls->ls_rsbtbl_lock);
1376 dlm_free_rsb(r);
1377 goto retry;
1378 } else if (error) {
1379 write_unlock_bh(&ls->ls_rsbtbl_lock);
1380 /* should never happen */
1381 dlm_free_rsb(r);
1382 goto retry;
1383 }
1384
1385 list_add(&r->res_slow_list, &ls->ls_slow_inactive);
1386 write_unlock_bh(&ls->ls_rsbtbl_lock);
1387
1388 if (result)
1389 *result = DLM_LU_ADD;
1390 *r_nodeid = from_nodeid;
1391 out:
1392 return error;
1393 }
1394
dlm_master_lookup(struct dlm_ls * ls,int from_nodeid,const char * name,int len,unsigned int flags,int * r_nodeid,int * result)1395 int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1396 int len, unsigned int flags, int *r_nodeid, int *result)
1397 {
1398 int rv;
1399 rcu_read_lock();
1400 rv = _dlm_master_lookup(ls, from_nodeid, name, len, flags, r_nodeid, result);
1401 rcu_read_unlock();
1402 return rv;
1403 }
1404
dlm_dump_rsb_hash(struct dlm_ls * ls,uint32_t hash)1405 static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
1406 {
1407 struct dlm_rsb *r;
1408
1409 read_lock_bh(&ls->ls_rsbtbl_lock);
1410 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
1411 if (r->res_hash == hash)
1412 dlm_dump_rsb(r);
1413 }
1414 read_unlock_bh(&ls->ls_rsbtbl_lock);
1415 }
1416
dlm_dump_rsb_name(struct dlm_ls * ls,const char * name,int len)1417 void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
1418 {
1419 struct dlm_rsb *r = NULL;
1420 int error;
1421
1422 rcu_read_lock();
1423 error = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
1424 if (!error)
1425 goto out;
1426
1427 dlm_dump_rsb(r);
1428 out:
1429 rcu_read_unlock();
1430 }
1431
deactivate_rsb(struct kref * kref)1432 static void deactivate_rsb(struct kref *kref)
1433 {
1434 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
1435 struct dlm_ls *ls = r->res_ls;
1436 int our_nodeid = dlm_our_nodeid();
1437
1438 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
1439 rsb_set_flag(r, RSB_INACTIVE);
1440 list_move(&r->res_slow_list, &ls->ls_slow_inactive);
1441
1442 /*
1443 * When the rsb becomes unused, there are two possibilities:
1444 * 1. Leave the inactive rsb in place (don't remove it).
1445 * 2. Add it to the scan list to be removed.
1446 *
1447 * 1 is done when the rsb is acting as the dir record
1448 * for a remotely mastered rsb. The rsb must be left
1449 * in place as an inactive rsb to act as the dir record.
1450 *
1451 * 2 is done when a) the rsb is not the master and not the
1452 * dir record, b) when the rsb is both the master and the
1453 * dir record, c) when the rsb is master but not dir record.
1454 *
1455 * (If no directory is used, the rsb can always be removed.)
1456 */
1457 if (dlm_no_directory(ls) ||
1458 (r->res_master_nodeid == our_nodeid ||
1459 dlm_dir_nodeid(r) != our_nodeid))
1460 add_scan(ls, r);
1461
1462 if (r->res_lvbptr) {
1463 dlm_free_lvb(r->res_lvbptr);
1464 r->res_lvbptr = NULL;
1465 }
1466 }
1467
free_inactive_rsb(struct dlm_rsb * r)1468 void free_inactive_rsb(struct dlm_rsb *r)
1469 {
1470 WARN_ON_ONCE(!rsb_flag(r, RSB_INACTIVE));
1471
1472 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
1473 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
1474 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
1475 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
1476 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
1477 DLM_ASSERT(list_empty(&r->res_scan_list), dlm_dump_rsb(r););
1478 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
1479 DLM_ASSERT(list_empty(&r->res_masters_list), dlm_dump_rsb(r););
1480
1481 dlm_free_rsb(r);
1482 }
1483
1484 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
1485 The rsb must exist as long as any lkb's for it do. */
1486
attach_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1487 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1488 {
1489 hold_rsb(r);
1490 lkb->lkb_resource = r;
1491 }
1492
detach_lkb(struct dlm_lkb * lkb)1493 static void detach_lkb(struct dlm_lkb *lkb)
1494 {
1495 if (lkb->lkb_resource) {
1496 put_rsb(lkb->lkb_resource);
1497 lkb->lkb_resource = NULL;
1498 }
1499 }
1500
_create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret,unsigned long start,unsigned long end)1501 static int _create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret,
1502 unsigned long start, unsigned long end)
1503 {
1504 struct xa_limit limit;
1505 struct dlm_lkb *lkb;
1506 int rv;
1507
1508 limit.max = end;
1509 limit.min = start;
1510
1511 lkb = dlm_allocate_lkb();
1512 if (!lkb)
1513 return -ENOMEM;
1514
1515 lkb->lkb_last_bast_cb_mode = DLM_LOCK_IV;
1516 lkb->lkb_last_cast_cb_mode = DLM_LOCK_IV;
1517 lkb->lkb_last_cb_mode = DLM_LOCK_IV;
1518 lkb->lkb_nodeid = -1;
1519 lkb->lkb_grmode = DLM_LOCK_IV;
1520 kref_init(&lkb->lkb_ref);
1521 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
1522 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
1523
1524 write_lock_bh(&ls->ls_lkbxa_lock);
1525 rv = xa_alloc(&ls->ls_lkbxa, &lkb->lkb_id, lkb, limit, GFP_ATOMIC);
1526 write_unlock_bh(&ls->ls_lkbxa_lock);
1527
1528 if (rv < 0) {
1529 log_error(ls, "create_lkb xa error %d", rv);
1530 dlm_free_lkb(lkb);
1531 return rv;
1532 }
1533
1534 *lkb_ret = lkb;
1535 return 0;
1536 }
1537
create_lkb(struct dlm_ls * ls,struct dlm_lkb ** lkb_ret)1538 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
1539 {
1540 return _create_lkb(ls, lkb_ret, 1, ULONG_MAX);
1541 }
1542
find_lkb(struct dlm_ls * ls,uint32_t lkid,struct dlm_lkb ** lkb_ret)1543 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
1544 {
1545 struct dlm_lkb *lkb;
1546
1547 rcu_read_lock();
1548 lkb = xa_load(&ls->ls_lkbxa, lkid);
1549 if (lkb) {
1550 /* check if lkb is still part of lkbxa under lkbxa_lock as
1551 * the lkb_ref is tight to the lkbxa data structure, see
1552 * __put_lkb().
1553 */
1554 read_lock_bh(&ls->ls_lkbxa_lock);
1555 if (kref_read(&lkb->lkb_ref))
1556 kref_get(&lkb->lkb_ref);
1557 else
1558 lkb = NULL;
1559 read_unlock_bh(&ls->ls_lkbxa_lock);
1560 }
1561 rcu_read_unlock();
1562
1563 *lkb_ret = lkb;
1564 return lkb ? 0 : -ENOENT;
1565 }
1566
kill_lkb(struct kref * kref)1567 static void kill_lkb(struct kref *kref)
1568 {
1569 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1570
1571 /* All work is done after the return from kref_put() so we
1572 can release the write_lock before the detach_lkb */
1573
1574 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1575 }
1576
1577 /* __put_lkb() is used when an lkb may not have an rsb attached to
1578 it so we need to provide the lockspace explicitly */
1579
__put_lkb(struct dlm_ls * ls,struct dlm_lkb * lkb)1580 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
1581 {
1582 uint32_t lkid = lkb->lkb_id;
1583 int rv;
1584
1585 rv = dlm_kref_put_write_lock_bh(&lkb->lkb_ref, kill_lkb,
1586 &ls->ls_lkbxa_lock);
1587 if (rv) {
1588 xa_erase(&ls->ls_lkbxa, lkid);
1589 write_unlock_bh(&ls->ls_lkbxa_lock);
1590
1591 detach_lkb(lkb);
1592
1593 /* for local/process lkbs, lvbptr points to caller's lksb */
1594 if (lkb->lkb_lvbptr && is_master_copy(lkb))
1595 dlm_free_lvb(lkb->lkb_lvbptr);
1596 dlm_free_lkb(lkb);
1597 }
1598
1599 return rv;
1600 }
1601
dlm_put_lkb(struct dlm_lkb * lkb)1602 int dlm_put_lkb(struct dlm_lkb *lkb)
1603 {
1604 struct dlm_ls *ls;
1605
1606 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
1607 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
1608
1609 ls = lkb->lkb_resource->res_ls;
1610 return __put_lkb(ls, lkb);
1611 }
1612
1613 /* This is only called to add a reference when the code already holds
1614 a valid reference to the lkb, so there's no need for locking. */
1615
hold_lkb(struct dlm_lkb * lkb)1616 static inline void hold_lkb(struct dlm_lkb *lkb)
1617 {
1618 kref_get(&lkb->lkb_ref);
1619 }
1620
unhold_lkb_assert(struct kref * kref)1621 static void unhold_lkb_assert(struct kref *kref)
1622 {
1623 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
1624
1625 DLM_ASSERT(false, dlm_print_lkb(lkb););
1626 }
1627
1628 /* This is called when we need to remove a reference and are certain
1629 it's not the last ref. e.g. del_lkb is always called between a
1630 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
1631 put_lkb would work fine, but would involve unnecessary locking */
1632
unhold_lkb(struct dlm_lkb * lkb)1633 static inline void unhold_lkb(struct dlm_lkb *lkb)
1634 {
1635 kref_put(&lkb->lkb_ref, unhold_lkb_assert);
1636 }
1637
lkb_add_ordered(struct list_head * new,struct list_head * head,int mode)1638 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
1639 int mode)
1640 {
1641 struct dlm_lkb *lkb = NULL, *iter;
1642
1643 list_for_each_entry(iter, head, lkb_statequeue)
1644 if (iter->lkb_rqmode < mode) {
1645 lkb = iter;
1646 list_add_tail(new, &iter->lkb_statequeue);
1647 break;
1648 }
1649
1650 if (!lkb)
1651 list_add_tail(new, head);
1652 }
1653
1654 /* add/remove lkb to rsb's grant/convert/wait queue */
1655
add_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int status)1656 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
1657 {
1658 kref_get(&lkb->lkb_ref);
1659
1660 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
1661
1662 lkb->lkb_timestamp = ktime_get();
1663
1664 lkb->lkb_status = status;
1665
1666 switch (status) {
1667 case DLM_LKSTS_WAITING:
1668 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1669 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
1670 else
1671 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
1672 break;
1673 case DLM_LKSTS_GRANTED:
1674 /* convention says granted locks kept in order of grmode */
1675 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
1676 lkb->lkb_grmode);
1677 break;
1678 case DLM_LKSTS_CONVERT:
1679 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
1680 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
1681 else
1682 list_add_tail(&lkb->lkb_statequeue,
1683 &r->res_convertqueue);
1684 break;
1685 default:
1686 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
1687 }
1688 }
1689
del_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb)1690 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691 {
1692 lkb->lkb_status = 0;
1693 list_del(&lkb->lkb_statequeue);
1694 unhold_lkb(lkb);
1695 }
1696
move_lkb(struct dlm_rsb * r,struct dlm_lkb * lkb,int sts)1697 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
1698 {
1699 del_lkb(r, lkb);
1700 add_lkb(r, lkb, sts);
1701 }
1702
msg_reply_type(int mstype)1703 static int msg_reply_type(int mstype)
1704 {
1705 switch (mstype) {
1706 case DLM_MSG_REQUEST:
1707 return DLM_MSG_REQUEST_REPLY;
1708 case DLM_MSG_CONVERT:
1709 return DLM_MSG_CONVERT_REPLY;
1710 case DLM_MSG_UNLOCK:
1711 return DLM_MSG_UNLOCK_REPLY;
1712 case DLM_MSG_CANCEL:
1713 return DLM_MSG_CANCEL_REPLY;
1714 case DLM_MSG_LOOKUP:
1715 return DLM_MSG_LOOKUP_REPLY;
1716 }
1717 return -1;
1718 }
1719
1720 /* add/remove lkb from global waiters list of lkb's waiting for
1721 a reply from a remote node */
1722
add_to_waiters(struct dlm_lkb * lkb,int mstype,int to_nodeid)1723 static void add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
1724 {
1725 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1726
1727 spin_lock_bh(&ls->ls_waiters_lock);
1728 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
1729 switch (mstype) {
1730 case DLM_MSG_UNLOCK:
1731 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
1732 break;
1733 case DLM_MSG_CANCEL:
1734 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
1735 break;
1736 default:
1737 /* should never happen as validate_lock_args() checks
1738 * on lkb_wait_type and validate_unlock_args() only
1739 * creates UNLOCK or CANCEL messages.
1740 */
1741 WARN_ON_ONCE(1);
1742 goto out;
1743 }
1744 lkb->lkb_wait_count++;
1745 hold_lkb(lkb);
1746
1747 log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
1748 lkb->lkb_id, lkb->lkb_wait_type, mstype,
1749 lkb->lkb_wait_count, dlm_iflags_val(lkb));
1750 goto out;
1751 }
1752
1753 DLM_ASSERT(!lkb->lkb_wait_count,
1754 dlm_print_lkb(lkb);
1755 printk("wait_count %d\n", lkb->lkb_wait_count););
1756
1757 lkb->lkb_wait_count++;
1758 lkb->lkb_wait_type = mstype;
1759 lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
1760 hold_lkb(lkb);
1761 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
1762 out:
1763 spin_unlock_bh(&ls->ls_waiters_lock);
1764 }
1765
1766 /* We clear the RESEND flag because we might be taking an lkb off the waiters
1767 list as part of process_requestqueue (e.g. a lookup that has an optimized
1768 request reply on the requestqueue) between dlm_recover_waiters_pre() which
1769 set RESEND and dlm_recover_waiters_post() */
1770
_remove_from_waiters(struct dlm_lkb * lkb,int mstype,const struct dlm_message * ms)1771 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
1772 const struct dlm_message *ms)
1773 {
1774 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1775 int overlap_done = 0;
1776
1777 if (mstype == DLM_MSG_UNLOCK_REPLY &&
1778 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
1779 log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
1780 overlap_done = 1;
1781 goto out_del;
1782 }
1783
1784 if (mstype == DLM_MSG_CANCEL_REPLY &&
1785 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1786 log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
1787 overlap_done = 1;
1788 goto out_del;
1789 }
1790
1791 /* Cancel state was preemptively cleared by a successful convert,
1792 see next comment, nothing to do. */
1793
1794 if ((mstype == DLM_MSG_CANCEL_REPLY) &&
1795 (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
1796 log_debug(ls, "remwait %x cancel_reply wait_type %d",
1797 lkb->lkb_id, lkb->lkb_wait_type);
1798 return -1;
1799 }
1800
1801 /* Remove for the convert reply, and premptively remove for the
1802 cancel reply. A convert has been granted while there's still
1803 an outstanding cancel on it (the cancel is moot and the result
1804 in the cancel reply should be 0). We preempt the cancel reply
1805 because the app gets the convert result and then can follow up
1806 with another op, like convert. This subsequent op would see the
1807 lingering state of the cancel and fail with -EBUSY. */
1808
1809 if ((mstype == DLM_MSG_CONVERT_REPLY) &&
1810 (lkb->lkb_wait_type == DLM_MSG_CONVERT) && ms && !ms->m_result &&
1811 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags)) {
1812 log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1813 lkb->lkb_id);
1814 lkb->lkb_wait_type = 0;
1815 lkb->lkb_wait_count--;
1816 unhold_lkb(lkb);
1817 goto out_del;
1818 }
1819
1820 /* N.B. type of reply may not always correspond to type of original
1821 msg due to lookup->request optimization, verify others? */
1822
1823 if (lkb->lkb_wait_type) {
1824 lkb->lkb_wait_type = 0;
1825 goto out_del;
1826 }
1827
1828 log_error(ls, "remwait error %x remote %d %x msg %d flags %x no wait",
1829 lkb->lkb_id, ms ? le32_to_cpu(ms->m_header.h_nodeid) : 0,
1830 lkb->lkb_remid, mstype, dlm_iflags_val(lkb));
1831 return -1;
1832
1833 out_del:
1834 /* the force-unlock/cancel has completed and we haven't recvd a reply
1835 to the op that was in progress prior to the unlock/cancel; we
1836 give up on any reply to the earlier op. FIXME: not sure when/how
1837 this would happen */
1838
1839 if (overlap_done && lkb->lkb_wait_type) {
1840 log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1841 lkb->lkb_id, mstype, lkb->lkb_wait_type);
1842 lkb->lkb_wait_count--;
1843 unhold_lkb(lkb);
1844 lkb->lkb_wait_type = 0;
1845 }
1846
1847 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1848
1849 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
1850 lkb->lkb_wait_count--;
1851 if (!lkb->lkb_wait_count)
1852 list_del_init(&lkb->lkb_wait_reply);
1853 unhold_lkb(lkb);
1854 return 0;
1855 }
1856
remove_from_waiters(struct dlm_lkb * lkb,int mstype)1857 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1858 {
1859 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1860 int error;
1861
1862 spin_lock_bh(&ls->ls_waiters_lock);
1863 error = _remove_from_waiters(lkb, mstype, NULL);
1864 spin_unlock_bh(&ls->ls_waiters_lock);
1865 return error;
1866 }
1867
1868 /* Handles situations where we might be processing a "fake" or "local" reply in
1869 * the recovery context which stops any locking activity. Only debugfs might
1870 * change the lockspace waiters but they will held the recovery lock to ensure
1871 * remove_from_waiters_ms() in local case will be the only user manipulating the
1872 * lockspace waiters in recovery context.
1873 */
1874
remove_from_waiters_ms(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)1875 static int remove_from_waiters_ms(struct dlm_lkb *lkb,
1876 const struct dlm_message *ms, bool local)
1877 {
1878 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1879 int error;
1880
1881 if (!local)
1882 spin_lock_bh(&ls->ls_waiters_lock);
1883 else
1884 WARN_ON_ONCE(!rwsem_is_locked(&ls->ls_in_recovery) ||
1885 !dlm_locking_stopped(ls));
1886 error = _remove_from_waiters(lkb, le32_to_cpu(ms->m_type), ms);
1887 if (!local)
1888 spin_unlock_bh(&ls->ls_waiters_lock);
1889 return error;
1890 }
1891
1892 /* lkb is master or local copy */
1893
set_lvb_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)1894 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1895 {
1896 int b, len = r->res_ls->ls_lvblen;
1897
1898 /* b=1 lvb returned to caller
1899 b=0 lvb written to rsb or invalidated
1900 b=-1 do nothing */
1901
1902 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1903
1904 if (b == 1) {
1905 if (!lkb->lkb_lvbptr)
1906 return;
1907
1908 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1909 return;
1910
1911 if (!r->res_lvbptr)
1912 return;
1913
1914 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1915 lkb->lkb_lvbseq = r->res_lvbseq;
1916
1917 } else if (b == 0) {
1918 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1919 rsb_set_flag(r, RSB_VALNOTVALID);
1920 return;
1921 }
1922
1923 if (!lkb->lkb_lvbptr)
1924 return;
1925
1926 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1927 return;
1928
1929 if (!r->res_lvbptr)
1930 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1931
1932 if (!r->res_lvbptr)
1933 return;
1934
1935 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1936 r->res_lvbseq++;
1937 lkb->lkb_lvbseq = r->res_lvbseq;
1938 rsb_clear_flag(r, RSB_VALNOTVALID);
1939 }
1940
1941 if (rsb_flag(r, RSB_VALNOTVALID))
1942 set_bit(DLM_SBF_VALNOTVALID_BIT, &lkb->lkb_sbflags);
1943 }
1944
set_lvb_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)1945 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1946 {
1947 if (lkb->lkb_grmode < DLM_LOCK_PW)
1948 return;
1949
1950 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1951 rsb_set_flag(r, RSB_VALNOTVALID);
1952 return;
1953 }
1954
1955 if (!lkb->lkb_lvbptr)
1956 return;
1957
1958 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1959 return;
1960
1961 if (!r->res_lvbptr)
1962 r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1963
1964 if (!r->res_lvbptr)
1965 return;
1966
1967 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1968 r->res_lvbseq++;
1969 rsb_clear_flag(r, RSB_VALNOTVALID);
1970 }
1971
1972 /* lkb is process copy (pc) */
1973
set_lvb_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)1974 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1975 const struct dlm_message *ms)
1976 {
1977 int b;
1978
1979 if (!lkb->lkb_lvbptr)
1980 return;
1981
1982 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1983 return;
1984
1985 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1986 if (b == 1) {
1987 int len = receive_extralen(ms);
1988 if (len > r->res_ls->ls_lvblen)
1989 len = r->res_ls->ls_lvblen;
1990 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1991 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
1992 }
1993 }
1994
1995 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1996 remove_lock -- used for unlock, removes lkb from granted
1997 revert_lock -- used for cancel, moves lkb from convert to granted
1998 grant_lock -- used for request and convert, adds lkb to granted or
1999 moves lkb from convert or waiting to granted
2000
2001 Each of these is used for master or local copy lkb's. There is
2002 also a _pc() variation used to make the corresponding change on
2003 a process copy (pc) lkb. */
2004
_remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2005 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2006 {
2007 del_lkb(r, lkb);
2008 lkb->lkb_grmode = DLM_LOCK_IV;
2009 /* this unhold undoes the original ref from create_lkb()
2010 so this leads to the lkb being freed */
2011 unhold_lkb(lkb);
2012 }
2013
remove_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2014 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2015 {
2016 set_lvb_unlock(r, lkb);
2017 _remove_lock(r, lkb);
2018 }
2019
remove_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2020 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2021 {
2022 _remove_lock(r, lkb);
2023 }
2024
2025 /* returns: 0 did nothing
2026 1 moved lock to granted
2027 -1 removed lock */
2028
revert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2029 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2030 {
2031 int rv = 0;
2032
2033 lkb->lkb_rqmode = DLM_LOCK_IV;
2034
2035 switch (lkb->lkb_status) {
2036 case DLM_LKSTS_GRANTED:
2037 break;
2038 case DLM_LKSTS_CONVERT:
2039 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2040 rv = 1;
2041 break;
2042 case DLM_LKSTS_WAITING:
2043 del_lkb(r, lkb);
2044 lkb->lkb_grmode = DLM_LOCK_IV;
2045 /* this unhold undoes the original ref from create_lkb()
2046 so this leads to the lkb being freed */
2047 unhold_lkb(lkb);
2048 rv = -1;
2049 break;
2050 default:
2051 log_print("invalid status for revert %d", lkb->lkb_status);
2052 }
2053 return rv;
2054 }
2055
revert_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb)2056 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
2057 {
2058 return revert_lock(r, lkb);
2059 }
2060
_grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2061 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2062 {
2063 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
2064 lkb->lkb_grmode = lkb->lkb_rqmode;
2065 if (lkb->lkb_status)
2066 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
2067 else
2068 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
2069 }
2070
2071 lkb->lkb_rqmode = DLM_LOCK_IV;
2072 lkb->lkb_highbast = 0;
2073 }
2074
grant_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)2075 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2076 {
2077 set_lvb_lock(r, lkb);
2078 _grant_lock(r, lkb);
2079 }
2080
grant_lock_pc(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms)2081 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
2082 const struct dlm_message *ms)
2083 {
2084 set_lvb_lock_pc(r, lkb, ms);
2085 _grant_lock(r, lkb);
2086 }
2087
2088 /* called by grant_pending_locks() which means an async grant message must
2089 be sent to the requesting node in addition to granting the lock if the
2090 lkb belongs to a remote node. */
2091
grant_lock_pending(struct dlm_rsb * r,struct dlm_lkb * lkb)2092 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
2093 {
2094 grant_lock(r, lkb);
2095 if (is_master_copy(lkb))
2096 send_grant(r, lkb);
2097 else
2098 queue_cast(r, lkb, 0);
2099 }
2100
2101 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
2102 change the granted/requested modes. We're munging things accordingly in
2103 the process copy.
2104 CONVDEADLK: our grmode may have been forced down to NL to resolve a
2105 conversion deadlock
2106 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
2107 compatible with other granted locks */
2108
munge_demoted(struct dlm_lkb * lkb)2109 static void munge_demoted(struct dlm_lkb *lkb)
2110 {
2111 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
2112 log_print("munge_demoted %x invalid modes gr %d rq %d",
2113 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
2114 return;
2115 }
2116
2117 lkb->lkb_grmode = DLM_LOCK_NL;
2118 }
2119
munge_altmode(struct dlm_lkb * lkb,const struct dlm_message * ms)2120 static void munge_altmode(struct dlm_lkb *lkb, const struct dlm_message *ms)
2121 {
2122 if (ms->m_type != cpu_to_le32(DLM_MSG_REQUEST_REPLY) &&
2123 ms->m_type != cpu_to_le32(DLM_MSG_GRANT)) {
2124 log_print("munge_altmode %x invalid reply type %d",
2125 lkb->lkb_id, le32_to_cpu(ms->m_type));
2126 return;
2127 }
2128
2129 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
2130 lkb->lkb_rqmode = DLM_LOCK_PR;
2131 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
2132 lkb->lkb_rqmode = DLM_LOCK_CW;
2133 else {
2134 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
2135 dlm_print_lkb(lkb);
2136 }
2137 }
2138
first_in_list(struct dlm_lkb * lkb,struct list_head * head)2139 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
2140 {
2141 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
2142 lkb_statequeue);
2143 if (lkb->lkb_id == first->lkb_id)
2144 return 1;
2145
2146 return 0;
2147 }
2148
2149 /* Check if the given lkb conflicts with another lkb on the queue. */
2150
queue_conflict(struct list_head * head,struct dlm_lkb * lkb)2151 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
2152 {
2153 struct dlm_lkb *this;
2154
2155 list_for_each_entry(this, head, lkb_statequeue) {
2156 if (this == lkb)
2157 continue;
2158 if (!modes_compat(this, lkb))
2159 return 1;
2160 }
2161 return 0;
2162 }
2163
2164 /*
2165 * "A conversion deadlock arises with a pair of lock requests in the converting
2166 * queue for one resource. The granted mode of each lock blocks the requested
2167 * mode of the other lock."
2168 *
2169 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
2170 * convert queue from being granted, then deadlk/demote lkb.
2171 *
2172 * Example:
2173 * Granted Queue: empty
2174 * Convert Queue: NL->EX (first lock)
2175 * PR->EX (second lock)
2176 *
2177 * The first lock can't be granted because of the granted mode of the second
2178 * lock and the second lock can't be granted because it's not first in the
2179 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
2180 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
2181 * flag set and return DEMOTED in the lksb flags.
2182 *
2183 * Originally, this function detected conv-deadlk in a more limited scope:
2184 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
2185 * - if lkb1 was the first entry in the queue (not just earlier), and was
2186 * blocked by the granted mode of lkb2, and there was nothing on the
2187 * granted queue preventing lkb1 from being granted immediately, i.e.
2188 * lkb2 was the only thing preventing lkb1 from being granted.
2189 *
2190 * That second condition meant we'd only say there was conv-deadlk if
2191 * resolving it (by demotion) would lead to the first lock on the convert
2192 * queue being granted right away. It allowed conversion deadlocks to exist
2193 * between locks on the convert queue while they couldn't be granted anyway.
2194 *
2195 * Now, we detect and take action on conversion deadlocks immediately when
2196 * they're created, even if they may not be immediately consequential. If
2197 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
2198 * mode that would prevent lkb1's conversion from being granted, we do a
2199 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
2200 * I think this means that the lkb_is_ahead condition below should always
2201 * be zero, i.e. there will never be conv-deadlk between two locks that are
2202 * both already on the convert queue.
2203 */
2204
conversion_deadlock_detect(struct dlm_rsb * r,struct dlm_lkb * lkb2)2205 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
2206 {
2207 struct dlm_lkb *lkb1;
2208 int lkb_is_ahead = 0;
2209
2210 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
2211 if (lkb1 == lkb2) {
2212 lkb_is_ahead = 1;
2213 continue;
2214 }
2215
2216 if (!lkb_is_ahead) {
2217 if (!modes_compat(lkb2, lkb1))
2218 return 1;
2219 } else {
2220 if (!modes_compat(lkb2, lkb1) &&
2221 !modes_compat(lkb1, lkb2))
2222 return 1;
2223 }
2224 }
2225 return 0;
2226 }
2227
2228 /*
2229 * Return 1 if the lock can be granted, 0 otherwise.
2230 * Also detect and resolve conversion deadlocks.
2231 *
2232 * lkb is the lock to be granted
2233 *
2234 * now is 1 if the function is being called in the context of the
2235 * immediate request, it is 0 if called later, after the lock has been
2236 * queued.
2237 *
2238 * recover is 1 if dlm_recover_grant() is trying to grant conversions
2239 * after recovery.
2240 *
2241 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
2242 */
2243
_can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover)2244 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2245 int recover)
2246 {
2247 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
2248
2249 /*
2250 * 6-10: Version 5.4 introduced an option to address the phenomenon of
2251 * a new request for a NL mode lock being blocked.
2252 *
2253 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
2254 * request, then it would be granted. In essence, the use of this flag
2255 * tells the Lock Manager to expedite theis request by not considering
2256 * what may be in the CONVERTING or WAITING queues... As of this
2257 * writing, the EXPEDITE flag can be used only with new requests for NL
2258 * mode locks. This flag is not valid for conversion requests.
2259 *
2260 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
2261 * conversion or used with a non-NL requested mode. We also know an
2262 * EXPEDITE request is always granted immediately, so now must always
2263 * be 1. The full condition to grant an expedite request: (now &&
2264 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
2265 * therefore be shortened to just checking the flag.
2266 */
2267
2268 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
2269 return 1;
2270
2271 /*
2272 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
2273 * added to the remaining conditions.
2274 */
2275
2276 if (queue_conflict(&r->res_grantqueue, lkb))
2277 return 0;
2278
2279 /*
2280 * 6-3: By default, a conversion request is immediately granted if the
2281 * requested mode is compatible with the modes of all other granted
2282 * locks
2283 */
2284
2285 if (queue_conflict(&r->res_convertqueue, lkb))
2286 return 0;
2287
2288 /*
2289 * The RECOVER_GRANT flag means dlm_recover_grant() is granting
2290 * locks for a recovered rsb, on which lkb's have been rebuilt.
2291 * The lkb's may have been rebuilt on the queues in a different
2292 * order than they were in on the previous master. So, granting
2293 * queued conversions in order after recovery doesn't make sense
2294 * since the order hasn't been preserved anyway. The new order
2295 * could also have created a new "in place" conversion deadlock.
2296 * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
2297 * After recovery, there would be no granted locks, and possibly
2298 * NL->EX, PR->EX, an in-place conversion deadlock.) So, after
2299 * recovery, grant conversions without considering order.
2300 */
2301
2302 if (conv && recover)
2303 return 1;
2304
2305 /*
2306 * 6-5: But the default algorithm for deciding whether to grant or
2307 * queue conversion requests does not by itself guarantee that such
2308 * requests are serviced on a "first come first serve" basis. This, in
2309 * turn, can lead to a phenomenon known as "indefinate postponement".
2310 *
2311 * 6-7: This issue is dealt with by using the optional QUECVT flag with
2312 * the system service employed to request a lock conversion. This flag
2313 * forces certain conversion requests to be queued, even if they are
2314 * compatible with the granted modes of other locks on the same
2315 * resource. Thus, the use of this flag results in conversion requests
2316 * being ordered on a "first come first servce" basis.
2317 *
2318 * DCT: This condition is all about new conversions being able to occur
2319 * "in place" while the lock remains on the granted queue (assuming
2320 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
2321 * doesn't _have_ to go onto the convert queue where it's processed in
2322 * order. The "now" variable is necessary to distinguish converts
2323 * being received and processed for the first time now, because once a
2324 * convert is moved to the conversion queue the condition below applies
2325 * requiring fifo granting.
2326 */
2327
2328 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
2329 return 1;
2330
2331 /*
2332 * Even if the convert is compat with all granted locks,
2333 * QUECVT forces it behind other locks on the convert queue.
2334 */
2335
2336 if (now && conv && (lkb->lkb_exflags & DLM_LKF_QUECVT)) {
2337 if (list_empty(&r->res_convertqueue))
2338 return 1;
2339 else
2340 return 0;
2341 }
2342
2343 /*
2344 * The NOORDER flag is set to avoid the standard vms rules on grant
2345 * order.
2346 */
2347
2348 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
2349 return 1;
2350
2351 /*
2352 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
2353 * granted until all other conversion requests ahead of it are granted
2354 * and/or canceled.
2355 */
2356
2357 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
2358 return 1;
2359
2360 /*
2361 * 6-4: By default, a new request is immediately granted only if all
2362 * three of the following conditions are satisfied when the request is
2363 * issued:
2364 * - The queue of ungranted conversion requests for the resource is
2365 * empty.
2366 * - The queue of ungranted new requests for the resource is empty.
2367 * - The mode of the new request is compatible with the most
2368 * restrictive mode of all granted locks on the resource.
2369 */
2370
2371 if (now && !conv && list_empty(&r->res_convertqueue) &&
2372 list_empty(&r->res_waitqueue))
2373 return 1;
2374
2375 /*
2376 * 6-4: Once a lock request is in the queue of ungranted new requests,
2377 * it cannot be granted until the queue of ungranted conversion
2378 * requests is empty, all ungranted new requests ahead of it are
2379 * granted and/or canceled, and it is compatible with the granted mode
2380 * of the most restrictive lock granted on the resource.
2381 */
2382
2383 if (!now && !conv && list_empty(&r->res_convertqueue) &&
2384 first_in_list(lkb, &r->res_waitqueue))
2385 return 1;
2386
2387 return 0;
2388 }
2389
can_be_granted(struct dlm_rsb * r,struct dlm_lkb * lkb,int now,int recover,int * err)2390 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
2391 int recover, int *err)
2392 {
2393 int rv;
2394 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
2395 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
2396
2397 if (err)
2398 *err = 0;
2399
2400 rv = _can_be_granted(r, lkb, now, recover);
2401 if (rv)
2402 goto out;
2403
2404 /*
2405 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
2406 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
2407 * cancels one of the locks.
2408 */
2409
2410 if (is_convert && can_be_queued(lkb) &&
2411 conversion_deadlock_detect(r, lkb)) {
2412 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
2413 lkb->lkb_grmode = DLM_LOCK_NL;
2414 set_bit(DLM_SBF_DEMOTED_BIT, &lkb->lkb_sbflags);
2415 } else if (err) {
2416 *err = -EDEADLK;
2417 } else {
2418 log_print("can_be_granted deadlock %x now %d",
2419 lkb->lkb_id, now);
2420 dlm_dump_rsb(r);
2421 }
2422 goto out;
2423 }
2424
2425 /*
2426 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
2427 * to grant a request in a mode other than the normal rqmode. It's a
2428 * simple way to provide a big optimization to applications that can
2429 * use them.
2430 */
2431
2432 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
2433 alt = DLM_LOCK_PR;
2434 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
2435 alt = DLM_LOCK_CW;
2436
2437 if (alt) {
2438 lkb->lkb_rqmode = alt;
2439 rv = _can_be_granted(r, lkb, now, 0);
2440 if (rv)
2441 set_bit(DLM_SBF_ALTMODE_BIT, &lkb->lkb_sbflags);
2442 else
2443 lkb->lkb_rqmode = rqmode;
2444 }
2445 out:
2446 return rv;
2447 }
2448
2449 /* Returns the highest requested mode of all blocked conversions; sets
2450 cw if there's a blocked conversion to DLM_LOCK_CW. */
2451
grant_pending_convert(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2452 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
2453 unsigned int *count)
2454 {
2455 struct dlm_lkb *lkb, *s;
2456 int recover = rsb_flag(r, RSB_RECOVER_GRANT);
2457 int hi, demoted, quit, grant_restart, demote_restart;
2458 int deadlk;
2459
2460 quit = 0;
2461 restart:
2462 grant_restart = 0;
2463 demote_restart = 0;
2464 hi = DLM_LOCK_IV;
2465
2466 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
2467 demoted = is_demoted(lkb);
2468 deadlk = 0;
2469
2470 if (can_be_granted(r, lkb, 0, recover, &deadlk)) {
2471 grant_lock_pending(r, lkb);
2472 grant_restart = 1;
2473 if (count)
2474 (*count)++;
2475 continue;
2476 }
2477
2478 if (!demoted && is_demoted(lkb)) {
2479 log_print("WARN: pending demoted %x node %d %s",
2480 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
2481 demote_restart = 1;
2482 continue;
2483 }
2484
2485 if (deadlk) {
2486 /*
2487 * If DLM_LKB_NODLKWT flag is set and conversion
2488 * deadlock is detected, we request blocking AST and
2489 * down (or cancel) conversion.
2490 */
2491 if (lkb->lkb_exflags & DLM_LKF_NODLCKWT) {
2492 if (lkb->lkb_highbast < lkb->lkb_rqmode) {
2493 queue_bast(r, lkb, lkb->lkb_rqmode);
2494 lkb->lkb_highbast = lkb->lkb_rqmode;
2495 }
2496 } else {
2497 log_print("WARN: pending deadlock %x node %d %s",
2498 lkb->lkb_id, lkb->lkb_nodeid,
2499 r->res_name);
2500 dlm_dump_rsb(r);
2501 }
2502 continue;
2503 }
2504
2505 hi = max_t(int, lkb->lkb_rqmode, hi);
2506
2507 if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
2508 *cw = 1;
2509 }
2510
2511 if (grant_restart)
2512 goto restart;
2513 if (demote_restart && !quit) {
2514 quit = 1;
2515 goto restart;
2516 }
2517
2518 return max_t(int, high, hi);
2519 }
2520
grant_pending_wait(struct dlm_rsb * r,int high,int * cw,unsigned int * count)2521 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
2522 unsigned int *count)
2523 {
2524 struct dlm_lkb *lkb, *s;
2525
2526 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
2527 if (can_be_granted(r, lkb, 0, 0, NULL)) {
2528 grant_lock_pending(r, lkb);
2529 if (count)
2530 (*count)++;
2531 } else {
2532 high = max_t(int, lkb->lkb_rqmode, high);
2533 if (lkb->lkb_rqmode == DLM_LOCK_CW)
2534 *cw = 1;
2535 }
2536 }
2537
2538 return high;
2539 }
2540
2541 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
2542 on either the convert or waiting queue.
2543 high is the largest rqmode of all locks blocked on the convert or
2544 waiting queue. */
2545
lock_requires_bast(struct dlm_lkb * gr,int high,int cw)2546 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
2547 {
2548 if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
2549 if (gr->lkb_highbast < DLM_LOCK_EX)
2550 return 1;
2551 return 0;
2552 }
2553
2554 if (gr->lkb_highbast < high &&
2555 !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
2556 return 1;
2557 return 0;
2558 }
2559
grant_pending_locks(struct dlm_rsb * r,unsigned int * count)2560 static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
2561 {
2562 struct dlm_lkb *lkb, *s;
2563 int high = DLM_LOCK_IV;
2564 int cw = 0;
2565
2566 if (!is_master(r)) {
2567 log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
2568 dlm_dump_rsb(r);
2569 return;
2570 }
2571
2572 high = grant_pending_convert(r, high, &cw, count);
2573 high = grant_pending_wait(r, high, &cw, count);
2574
2575 if (high == DLM_LOCK_IV)
2576 return;
2577
2578 /*
2579 * If there are locks left on the wait/convert queue then send blocking
2580 * ASTs to granted locks based on the largest requested mode (high)
2581 * found above.
2582 */
2583
2584 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
2585 if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
2586 if (cw && high == DLM_LOCK_PR &&
2587 lkb->lkb_grmode == DLM_LOCK_PR)
2588 queue_bast(r, lkb, DLM_LOCK_CW);
2589 else
2590 queue_bast(r, lkb, high);
2591 lkb->lkb_highbast = high;
2592 }
2593 }
2594 }
2595
modes_require_bast(struct dlm_lkb * gr,struct dlm_lkb * rq)2596 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
2597 {
2598 if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
2599 (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
2600 if (gr->lkb_highbast < DLM_LOCK_EX)
2601 return 1;
2602 return 0;
2603 }
2604
2605 if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
2606 return 1;
2607 return 0;
2608 }
2609
send_bast_queue(struct dlm_rsb * r,struct list_head * head,struct dlm_lkb * lkb)2610 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
2611 struct dlm_lkb *lkb)
2612 {
2613 struct dlm_lkb *gr;
2614
2615 list_for_each_entry(gr, head, lkb_statequeue) {
2616 /* skip self when sending basts to convertqueue */
2617 if (gr == lkb)
2618 continue;
2619 if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
2620 queue_bast(r, gr, lkb->lkb_rqmode);
2621 gr->lkb_highbast = lkb->lkb_rqmode;
2622 }
2623 }
2624 }
2625
send_blocking_asts(struct dlm_rsb * r,struct dlm_lkb * lkb)2626 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
2627 {
2628 send_bast_queue(r, &r->res_grantqueue, lkb);
2629 }
2630
send_blocking_asts_all(struct dlm_rsb * r,struct dlm_lkb * lkb)2631 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
2632 {
2633 send_bast_queue(r, &r->res_grantqueue, lkb);
2634 send_bast_queue(r, &r->res_convertqueue, lkb);
2635 }
2636
2637 /* set_master(r, lkb) -- set the master nodeid of a resource
2638
2639 The purpose of this function is to set the nodeid field in the given
2640 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
2641 known, it can just be copied to the lkb and the function will return
2642 0. If the rsb's nodeid is _not_ known, it needs to be looked up
2643 before it can be copied to the lkb.
2644
2645 When the rsb nodeid is being looked up remotely, the initial lkb
2646 causing the lookup is kept on the ls_waiters list waiting for the
2647 lookup reply. Other lkb's waiting for the same rsb lookup are kept
2648 on the rsb's res_lookup list until the master is verified.
2649
2650 Return values:
2651 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
2652 1: the rsb master is not available and the lkb has been placed on
2653 a wait queue
2654 */
2655
set_master(struct dlm_rsb * r,struct dlm_lkb * lkb)2656 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
2657 {
2658 int our_nodeid = dlm_our_nodeid();
2659
2660 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
2661 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
2662 r->res_first_lkid = lkb->lkb_id;
2663 lkb->lkb_nodeid = r->res_nodeid;
2664 return 0;
2665 }
2666
2667 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
2668 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2669 return 1;
2670 }
2671
2672 if (r->res_master_nodeid == our_nodeid) {
2673 lkb->lkb_nodeid = 0;
2674 return 0;
2675 }
2676
2677 if (r->res_master_nodeid) {
2678 lkb->lkb_nodeid = r->res_master_nodeid;
2679 return 0;
2680 }
2681
2682 if (dlm_dir_nodeid(r) == our_nodeid) {
2683 /* This is a somewhat unusual case; find_rsb will usually
2684 have set res_master_nodeid when dir nodeid is local, but
2685 there are cases where we become the dir node after we've
2686 past find_rsb and go through _request_lock again.
2687 confirm_master() or process_lookup_list() needs to be
2688 called after this. */
2689 log_debug(r->res_ls, "set_master %x self master %d dir %d %s",
2690 lkb->lkb_id, r->res_master_nodeid, r->res_dir_nodeid,
2691 r->res_name);
2692 r->res_master_nodeid = our_nodeid;
2693 r->res_nodeid = 0;
2694 lkb->lkb_nodeid = 0;
2695 return 0;
2696 }
2697
2698 r->res_first_lkid = lkb->lkb_id;
2699 send_lookup(r, lkb);
2700 return 1;
2701 }
2702
process_lookup_list(struct dlm_rsb * r)2703 static void process_lookup_list(struct dlm_rsb *r)
2704 {
2705 struct dlm_lkb *lkb, *safe;
2706
2707 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2708 list_del_init(&lkb->lkb_rsb_lookup);
2709 _request_lock(r, lkb);
2710 }
2711 }
2712
2713 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2714
confirm_master(struct dlm_rsb * r,int error)2715 static void confirm_master(struct dlm_rsb *r, int error)
2716 {
2717 struct dlm_lkb *lkb;
2718
2719 if (!r->res_first_lkid)
2720 return;
2721
2722 switch (error) {
2723 case 0:
2724 case -EINPROGRESS:
2725 r->res_first_lkid = 0;
2726 process_lookup_list(r);
2727 break;
2728
2729 case -EAGAIN:
2730 case -EBADR:
2731 case -ENOTBLK:
2732 /* the remote request failed and won't be retried (it was
2733 a NOQUEUE, or has been canceled/unlocked); make a waiting
2734 lkb the first_lkid */
2735
2736 r->res_first_lkid = 0;
2737
2738 if (!list_empty(&r->res_lookup)) {
2739 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2740 lkb_rsb_lookup);
2741 list_del_init(&lkb->lkb_rsb_lookup);
2742 r->res_first_lkid = lkb->lkb_id;
2743 _request_lock(r, lkb);
2744 }
2745 break;
2746
2747 default:
2748 log_error(r->res_ls, "confirm_master unknown error %d", error);
2749 }
2750 }
2751
set_lock_args(int mode,struct dlm_lksb * lksb,uint32_t flags,int namelen,void (* ast)(void * astparam),void * astparam,void (* bast)(void * astparam,int mode),struct dlm_args * args)2752 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2753 int namelen, void (*ast)(void *astparam),
2754 void *astparam,
2755 void (*bast)(void *astparam, int mode),
2756 struct dlm_args *args)
2757 {
2758 int rv = -EINVAL;
2759
2760 /* check for invalid arg usage */
2761
2762 if (mode < 0 || mode > DLM_LOCK_EX)
2763 goto out;
2764
2765 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2766 goto out;
2767
2768 if (flags & DLM_LKF_CANCEL)
2769 goto out;
2770
2771 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2772 goto out;
2773
2774 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2775 goto out;
2776
2777 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2778 goto out;
2779
2780 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2781 goto out;
2782
2783 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2784 goto out;
2785
2786 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2787 goto out;
2788
2789 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2790 goto out;
2791
2792 if (!ast || !lksb)
2793 goto out;
2794
2795 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2796 goto out;
2797
2798 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2799 goto out;
2800
2801 /* these args will be copied to the lkb in validate_lock_args,
2802 it cannot be done now because when converting locks, fields in
2803 an active lkb cannot be modified before locking the rsb */
2804
2805 args->flags = flags;
2806 args->astfn = ast;
2807 args->astparam = astparam;
2808 args->bastfn = bast;
2809 args->mode = mode;
2810 args->lksb = lksb;
2811 rv = 0;
2812 out:
2813 return rv;
2814 }
2815
set_unlock_args(uint32_t flags,void * astarg,struct dlm_args * args)2816 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2817 {
2818 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2819 DLM_LKF_FORCEUNLOCK))
2820 return -EINVAL;
2821
2822 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2823 return -EINVAL;
2824
2825 args->flags = flags;
2826 args->astparam = astarg;
2827 return 0;
2828 }
2829
validate_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)2830 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2831 struct dlm_args *args)
2832 {
2833 int rv = -EBUSY;
2834
2835 if (args->flags & DLM_LKF_CONVERT) {
2836 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2837 goto out;
2838
2839 /* lock not allowed if there's any op in progress */
2840 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2841 goto out;
2842
2843 if (is_overlap(lkb))
2844 goto out;
2845
2846 rv = -EINVAL;
2847 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags))
2848 goto out;
2849
2850 if (args->flags & DLM_LKF_QUECVT &&
2851 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2852 goto out;
2853 }
2854
2855 lkb->lkb_exflags = args->flags;
2856 dlm_set_sbflags_val(lkb, 0);
2857 lkb->lkb_astfn = args->astfn;
2858 lkb->lkb_astparam = args->astparam;
2859 lkb->lkb_bastfn = args->bastfn;
2860 lkb->lkb_rqmode = args->mode;
2861 lkb->lkb_lksb = args->lksb;
2862 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2863 lkb->lkb_ownpid = (int) current->pid;
2864 rv = 0;
2865 out:
2866 switch (rv) {
2867 case 0:
2868 break;
2869 case -EINVAL:
2870 /* annoy the user because dlm usage is wrong */
2871 WARN_ON(1);
2872 log_error(ls, "%s %d %x %x %x %d %d", __func__,
2873 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2874 lkb->lkb_status, lkb->lkb_wait_type);
2875 break;
2876 default:
2877 log_debug(ls, "%s %d %x %x %x %d %d", __func__,
2878 rv, lkb->lkb_id, dlm_iflags_val(lkb), args->flags,
2879 lkb->lkb_status, lkb->lkb_wait_type);
2880 break;
2881 }
2882
2883 return rv;
2884 }
2885
2886 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2887 for success */
2888
2889 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2890 because there may be a lookup in progress and it's valid to do
2891 cancel/unlockf on it */
2892
validate_unlock_args(struct dlm_lkb * lkb,struct dlm_args * args)2893 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2894 {
2895 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2896 int rv = -EBUSY;
2897
2898 /* normal unlock not allowed if there's any op in progress */
2899 if (!(args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) &&
2900 (lkb->lkb_wait_type || lkb->lkb_wait_count))
2901 goto out;
2902
2903 /* an lkb may be waiting for an rsb lookup to complete where the
2904 lookup was initiated by another lock */
2905
2906 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2907 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2908 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2909 list_del_init(&lkb->lkb_rsb_lookup);
2910 queue_cast(lkb->lkb_resource, lkb,
2911 args->flags & DLM_LKF_CANCEL ?
2912 -DLM_ECANCEL : -DLM_EUNLOCK);
2913 unhold_lkb(lkb); /* undoes create_lkb() */
2914 }
2915 /* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2916 goto out;
2917 }
2918
2919 rv = -EINVAL;
2920 if (test_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags)) {
2921 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2922 dlm_print_lkb(lkb);
2923 goto out;
2924 }
2925
2926 /* an lkb may still exist even though the lock is EOL'ed due to a
2927 * cancel, unlock or failed noqueue request; an app can't use these
2928 * locks; return same error as if the lkid had not been found at all
2929 */
2930
2931 if (test_bit(DLM_IFL_ENDOFLIFE_BIT, &lkb->lkb_iflags)) {
2932 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2933 rv = -ENOENT;
2934 goto out;
2935 }
2936
2937 if (is_overlap_unlock(lkb))
2938 goto out;
2939
2940 /* cancel not allowed with another cancel/unlock in progress */
2941
2942 if (args->flags & DLM_LKF_CANCEL) {
2943 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2944 goto out;
2945
2946 if (is_overlap_cancel(lkb))
2947 goto out;
2948
2949 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2950 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2951 rv = -EBUSY;
2952 goto out;
2953 }
2954
2955 /* there's nothing to cancel */
2956 if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2957 !lkb->lkb_wait_type) {
2958 rv = -EBUSY;
2959 goto out;
2960 }
2961
2962 switch (lkb->lkb_wait_type) {
2963 case DLM_MSG_LOOKUP:
2964 case DLM_MSG_REQUEST:
2965 set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
2966 rv = -EBUSY;
2967 goto out;
2968 case DLM_MSG_UNLOCK:
2969 case DLM_MSG_CANCEL:
2970 goto out;
2971 }
2972 /* add_to_waiters() will set OVERLAP_CANCEL */
2973 goto out_ok;
2974 }
2975
2976 /* do we need to allow a force-unlock if there's a normal unlock
2977 already in progress? in what conditions could the normal unlock
2978 fail such that we'd want to send a force-unlock to be sure? */
2979
2980 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2981 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2982 goto out;
2983
2984 if (test_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags)) {
2985 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2986 rv = -EBUSY;
2987 goto out;
2988 }
2989
2990 switch (lkb->lkb_wait_type) {
2991 case DLM_MSG_LOOKUP:
2992 case DLM_MSG_REQUEST:
2993 set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
2994 rv = -EBUSY;
2995 goto out;
2996 case DLM_MSG_UNLOCK:
2997 goto out;
2998 }
2999 /* add_to_waiters() will set OVERLAP_UNLOCK */
3000 }
3001
3002 out_ok:
3003 /* an overlapping op shouldn't blow away exflags from other op */
3004 lkb->lkb_exflags |= args->flags;
3005 dlm_set_sbflags_val(lkb, 0);
3006 lkb->lkb_astparam = args->astparam;
3007 rv = 0;
3008 out:
3009 switch (rv) {
3010 case 0:
3011 break;
3012 case -EINVAL:
3013 /* annoy the user because dlm usage is wrong */
3014 WARN_ON(1);
3015 log_error(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3016 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3017 args->flags, lkb->lkb_wait_type,
3018 lkb->lkb_resource->res_name);
3019 break;
3020 default:
3021 log_debug(ls, "%s %d %x %x %x %x %d %s", __func__, rv,
3022 lkb->lkb_id, dlm_iflags_val(lkb), lkb->lkb_exflags,
3023 args->flags, lkb->lkb_wait_type,
3024 lkb->lkb_resource->res_name);
3025 break;
3026 }
3027
3028 return rv;
3029 }
3030
3031 /*
3032 * Four stage 4 varieties:
3033 * do_request(), do_convert(), do_unlock(), do_cancel()
3034 * These are called on the master node for the given lock and
3035 * from the central locking logic.
3036 */
3037
do_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3038 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3039 {
3040 int error = 0;
3041
3042 if (can_be_granted(r, lkb, 1, 0, NULL)) {
3043 grant_lock(r, lkb);
3044 queue_cast(r, lkb, 0);
3045 goto out;
3046 }
3047
3048 if (can_be_queued(lkb)) {
3049 error = -EINPROGRESS;
3050 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3051 goto out;
3052 }
3053
3054 error = -EAGAIN;
3055 queue_cast(r, lkb, -EAGAIN);
3056 out:
3057 return error;
3058 }
3059
do_request_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3060 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3061 int error)
3062 {
3063 switch (error) {
3064 case -EAGAIN:
3065 if (force_blocking_asts(lkb))
3066 send_blocking_asts_all(r, lkb);
3067 break;
3068 case -EINPROGRESS:
3069 send_blocking_asts(r, lkb);
3070 break;
3071 }
3072 }
3073
do_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3074 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3075 {
3076 int error = 0;
3077 int deadlk = 0;
3078
3079 /* changing an existing lock may allow others to be granted */
3080
3081 if (can_be_granted(r, lkb, 1, 0, &deadlk)) {
3082 grant_lock(r, lkb);
3083 queue_cast(r, lkb, 0);
3084 goto out;
3085 }
3086
3087 /* can_be_granted() detected that this lock would block in a conversion
3088 deadlock, so we leave it on the granted queue and return EDEADLK in
3089 the ast for the convert. */
3090
3091 if (deadlk && !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
3092 /* it's left on the granted queue */
3093 revert_lock(r, lkb);
3094 queue_cast(r, lkb, -EDEADLK);
3095 error = -EDEADLK;
3096 goto out;
3097 }
3098
3099 /* is_demoted() means the can_be_granted() above set the grmode
3100 to NL, and left us on the granted queue. This auto-demotion
3101 (due to CONVDEADLK) might mean other locks, and/or this lock, are
3102 now grantable. We have to try to grant other converting locks
3103 before we try again to grant this one. */
3104
3105 if (is_demoted(lkb)) {
3106 grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
3107 if (_can_be_granted(r, lkb, 1, 0)) {
3108 grant_lock(r, lkb);
3109 queue_cast(r, lkb, 0);
3110 goto out;
3111 }
3112 /* else fall through and move to convert queue */
3113 }
3114
3115 if (can_be_queued(lkb)) {
3116 error = -EINPROGRESS;
3117 del_lkb(r, lkb);
3118 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3119 goto out;
3120 }
3121
3122 error = -EAGAIN;
3123 queue_cast(r, lkb, -EAGAIN);
3124 out:
3125 return error;
3126 }
3127
do_convert_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3128 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3129 int error)
3130 {
3131 switch (error) {
3132 case 0:
3133 grant_pending_locks(r, NULL);
3134 /* grant_pending_locks also sends basts */
3135 break;
3136 case -EAGAIN:
3137 if (force_blocking_asts(lkb))
3138 send_blocking_asts_all(r, lkb);
3139 break;
3140 case -EINPROGRESS:
3141 send_blocking_asts(r, lkb);
3142 break;
3143 }
3144 }
3145
do_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3146 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3147 {
3148 remove_lock(r, lkb);
3149 queue_cast(r, lkb, -DLM_EUNLOCK);
3150 return -DLM_EUNLOCK;
3151 }
3152
do_unlock_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3153 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3154 int error)
3155 {
3156 grant_pending_locks(r, NULL);
3157 }
3158
3159 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
3160
do_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3161 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3162 {
3163 int error;
3164
3165 error = revert_lock(r, lkb);
3166 if (error) {
3167 queue_cast(r, lkb, -DLM_ECANCEL);
3168 return -DLM_ECANCEL;
3169 }
3170 return 0;
3171 }
3172
do_cancel_effects(struct dlm_rsb * r,struct dlm_lkb * lkb,int error)3173 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
3174 int error)
3175 {
3176 if (error)
3177 grant_pending_locks(r, NULL);
3178 }
3179
3180 /*
3181 * Four stage 3 varieties:
3182 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
3183 */
3184
3185 /* add a new lkb to a possibly new rsb, called by requesting process */
3186
_request_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3187 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3188 {
3189 int error;
3190
3191 /* set_master: sets lkb nodeid from r */
3192
3193 error = set_master(r, lkb);
3194 if (error < 0)
3195 goto out;
3196 if (error) {
3197 error = 0;
3198 goto out;
3199 }
3200
3201 if (is_remote(r)) {
3202 /* receive_request() calls do_request() on remote node */
3203 error = send_request(r, lkb);
3204 } else {
3205 error = do_request(r, lkb);
3206 /* for remote locks the request_reply is sent
3207 between do_request and do_request_effects */
3208 do_request_effects(r, lkb, error);
3209 }
3210 out:
3211 return error;
3212 }
3213
3214 /* change some property of an existing lkb, e.g. mode */
3215
_convert_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3216 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3217 {
3218 int error;
3219
3220 if (is_remote(r)) {
3221 /* receive_convert() calls do_convert() on remote node */
3222 error = send_convert(r, lkb);
3223 } else {
3224 error = do_convert(r, lkb);
3225 /* for remote locks the convert_reply is sent
3226 between do_convert and do_convert_effects */
3227 do_convert_effects(r, lkb, error);
3228 }
3229
3230 return error;
3231 }
3232
3233 /* remove an existing lkb from the granted queue */
3234
_unlock_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3235 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3236 {
3237 int error;
3238
3239 if (is_remote(r)) {
3240 /* receive_unlock() calls do_unlock() on remote node */
3241 error = send_unlock(r, lkb);
3242 } else {
3243 error = do_unlock(r, lkb);
3244 /* for remote locks the unlock_reply is sent
3245 between do_unlock and do_unlock_effects */
3246 do_unlock_effects(r, lkb, error);
3247 }
3248
3249 return error;
3250 }
3251
3252 /* remove an existing lkb from the convert or wait queue */
3253
_cancel_lock(struct dlm_rsb * r,struct dlm_lkb * lkb)3254 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3255 {
3256 int error;
3257
3258 if (is_remote(r)) {
3259 /* receive_cancel() calls do_cancel() on remote node */
3260 error = send_cancel(r, lkb);
3261 } else {
3262 error = do_cancel(r, lkb);
3263 /* for remote locks the cancel_reply is sent
3264 between do_cancel and do_cancel_effects */
3265 do_cancel_effects(r, lkb, error);
3266 }
3267
3268 return error;
3269 }
3270
3271 /*
3272 * Four stage 2 varieties:
3273 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
3274 */
3275
request_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,const void * name,int len,struct dlm_args * args)3276 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3277 const void *name, int len,
3278 struct dlm_args *args)
3279 {
3280 struct dlm_rsb *r;
3281 int error;
3282
3283 error = validate_lock_args(ls, lkb, args);
3284 if (error)
3285 return error;
3286
3287 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
3288 if (error)
3289 return error;
3290
3291 lock_rsb(r);
3292
3293 attach_lkb(r, lkb);
3294 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
3295
3296 error = _request_lock(r, lkb);
3297
3298 unlock_rsb(r);
3299 put_rsb(r);
3300 return error;
3301 }
3302
convert_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3303 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3304 struct dlm_args *args)
3305 {
3306 struct dlm_rsb *r;
3307 int error;
3308
3309 r = lkb->lkb_resource;
3310
3311 hold_rsb(r);
3312 lock_rsb(r);
3313
3314 error = validate_lock_args(ls, lkb, args);
3315 if (error)
3316 goto out;
3317
3318 error = _convert_lock(r, lkb);
3319 out:
3320 unlock_rsb(r);
3321 put_rsb(r);
3322 return error;
3323 }
3324
unlock_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3325 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3326 struct dlm_args *args)
3327 {
3328 struct dlm_rsb *r;
3329 int error;
3330
3331 r = lkb->lkb_resource;
3332
3333 hold_rsb(r);
3334 lock_rsb(r);
3335
3336 error = validate_unlock_args(lkb, args);
3337 if (error)
3338 goto out;
3339
3340 error = _unlock_lock(r, lkb);
3341 out:
3342 unlock_rsb(r);
3343 put_rsb(r);
3344 return error;
3345 }
3346
cancel_lock(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_args * args)3347 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
3348 struct dlm_args *args)
3349 {
3350 struct dlm_rsb *r;
3351 int error;
3352
3353 r = lkb->lkb_resource;
3354
3355 hold_rsb(r);
3356 lock_rsb(r);
3357
3358 error = validate_unlock_args(lkb, args);
3359 if (error)
3360 goto out;
3361
3362 error = _cancel_lock(r, lkb);
3363 out:
3364 unlock_rsb(r);
3365 put_rsb(r);
3366 return error;
3367 }
3368
3369 /*
3370 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
3371 */
3372
dlm_lock(dlm_lockspace_t * lockspace,int mode,struct dlm_lksb * lksb,uint32_t flags,const void * name,unsigned int namelen,uint32_t parent_lkid,void (* ast)(void * astarg),void * astarg,void (* bast)(void * astarg,int mode))3373 int dlm_lock(dlm_lockspace_t *lockspace,
3374 int mode,
3375 struct dlm_lksb *lksb,
3376 uint32_t flags,
3377 const void *name,
3378 unsigned int namelen,
3379 uint32_t parent_lkid,
3380 void (*ast) (void *astarg),
3381 void *astarg,
3382 void (*bast) (void *astarg, int mode))
3383 {
3384 struct dlm_ls *ls;
3385 struct dlm_lkb *lkb;
3386 struct dlm_args args;
3387 int error, convert = flags & DLM_LKF_CONVERT;
3388
3389 ls = dlm_find_lockspace_local(lockspace);
3390 if (!ls)
3391 return -EINVAL;
3392
3393 dlm_lock_recovery(ls);
3394
3395 if (convert)
3396 error = find_lkb(ls, lksb->sb_lkid, &lkb);
3397 else
3398 error = create_lkb(ls, &lkb);
3399
3400 if (error)
3401 goto out;
3402
3403 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
3404
3405 error = set_lock_args(mode, lksb, flags, namelen, ast, astarg, bast,
3406 &args);
3407 if (error)
3408 goto out_put;
3409
3410 if (convert)
3411 error = convert_lock(ls, lkb, &args);
3412 else
3413 error = request_lock(ls, lkb, name, namelen, &args);
3414
3415 if (error == -EINPROGRESS)
3416 error = 0;
3417 out_put:
3418 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, true);
3419
3420 if (convert || error)
3421 __put_lkb(ls, lkb);
3422 if (error == -EAGAIN || error == -EDEADLK)
3423 error = 0;
3424 out:
3425 dlm_unlock_recovery(ls);
3426 dlm_put_lockspace(ls);
3427 return error;
3428 }
3429
dlm_unlock(dlm_lockspace_t * lockspace,uint32_t lkid,uint32_t flags,struct dlm_lksb * lksb,void * astarg)3430 int dlm_unlock(dlm_lockspace_t *lockspace,
3431 uint32_t lkid,
3432 uint32_t flags,
3433 struct dlm_lksb *lksb,
3434 void *astarg)
3435 {
3436 struct dlm_ls *ls;
3437 struct dlm_lkb *lkb;
3438 struct dlm_args args;
3439 int error;
3440
3441 ls = dlm_find_lockspace_local(lockspace);
3442 if (!ls)
3443 return -EINVAL;
3444
3445 dlm_lock_recovery(ls);
3446
3447 error = find_lkb(ls, lkid, &lkb);
3448 if (error)
3449 goto out;
3450
3451 trace_dlm_unlock_start(ls, lkb, flags);
3452
3453 error = set_unlock_args(flags, astarg, &args);
3454 if (error)
3455 goto out_put;
3456
3457 if (flags & DLM_LKF_CANCEL)
3458 error = cancel_lock(ls, lkb, &args);
3459 else
3460 error = unlock_lock(ls, lkb, &args);
3461
3462 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
3463 error = 0;
3464 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
3465 error = 0;
3466 out_put:
3467 trace_dlm_unlock_end(ls, lkb, flags, error);
3468
3469 dlm_put_lkb(lkb);
3470 out:
3471 dlm_unlock_recovery(ls);
3472 dlm_put_lockspace(ls);
3473 return error;
3474 }
3475
3476 /*
3477 * send/receive routines for remote operations and replies
3478 *
3479 * send_args
3480 * send_common
3481 * send_request receive_request
3482 * send_convert receive_convert
3483 * send_unlock receive_unlock
3484 * send_cancel receive_cancel
3485 * send_grant receive_grant
3486 * send_bast receive_bast
3487 * send_lookup receive_lookup
3488 * send_remove receive_remove
3489 *
3490 * send_common_reply
3491 * receive_request_reply send_request_reply
3492 * receive_convert_reply send_convert_reply
3493 * receive_unlock_reply send_unlock_reply
3494 * receive_cancel_reply send_cancel_reply
3495 * receive_lookup_reply send_lookup_reply
3496 */
3497
_create_message(struct dlm_ls * ls,int mb_len,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3498 static int _create_message(struct dlm_ls *ls, int mb_len,
3499 int to_nodeid, int mstype,
3500 struct dlm_message **ms_ret,
3501 struct dlm_mhandle **mh_ret)
3502 {
3503 struct dlm_message *ms;
3504 struct dlm_mhandle *mh;
3505 char *mb;
3506
3507 /* get_buffer gives us a message handle (mh) that we need to
3508 pass into midcomms_commit and a message buffer (mb) that we
3509 write our data into */
3510
3511 mh = dlm_midcomms_get_mhandle(to_nodeid, mb_len, &mb);
3512 if (!mh)
3513 return -ENOBUFS;
3514
3515 ms = (struct dlm_message *) mb;
3516
3517 ms->m_header.h_version = cpu_to_le32(DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
3518 ms->m_header.u.h_lockspace = cpu_to_le32(ls->ls_global_id);
3519 ms->m_header.h_nodeid = cpu_to_le32(dlm_our_nodeid());
3520 ms->m_header.h_length = cpu_to_le16(mb_len);
3521 ms->m_header.h_cmd = DLM_MSG;
3522
3523 ms->m_type = cpu_to_le32(mstype);
3524
3525 *mh_ret = mh;
3526 *ms_ret = ms;
3527 return 0;
3528 }
3529
create_message(struct dlm_rsb * r,struct dlm_lkb * lkb,int to_nodeid,int mstype,struct dlm_message ** ms_ret,struct dlm_mhandle ** mh_ret)3530 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
3531 int to_nodeid, int mstype,
3532 struct dlm_message **ms_ret,
3533 struct dlm_mhandle **mh_ret)
3534 {
3535 int mb_len = sizeof(struct dlm_message);
3536
3537 switch (mstype) {
3538 case DLM_MSG_REQUEST:
3539 case DLM_MSG_LOOKUP:
3540 case DLM_MSG_REMOVE:
3541 mb_len += r->res_length;
3542 break;
3543 case DLM_MSG_CONVERT:
3544 case DLM_MSG_UNLOCK:
3545 case DLM_MSG_REQUEST_REPLY:
3546 case DLM_MSG_CONVERT_REPLY:
3547 case DLM_MSG_GRANT:
3548 if (lkb && lkb->lkb_lvbptr && (lkb->lkb_exflags & DLM_LKF_VALBLK))
3549 mb_len += r->res_ls->ls_lvblen;
3550 break;
3551 }
3552
3553 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
3554 ms_ret, mh_ret);
3555 }
3556
3557 /* further lowcomms enhancements or alternate implementations may make
3558 the return value from this function useful at some point */
3559
send_message(struct dlm_mhandle * mh,struct dlm_message * ms,const void * name,int namelen)3560 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms,
3561 const void *name, int namelen)
3562 {
3563 dlm_midcomms_commit_mhandle(mh, name, namelen);
3564 return 0;
3565 }
3566
send_args(struct dlm_rsb * r,struct dlm_lkb * lkb,struct dlm_message * ms)3567 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
3568 struct dlm_message *ms)
3569 {
3570 ms->m_nodeid = cpu_to_le32(lkb->lkb_nodeid);
3571 ms->m_pid = cpu_to_le32(lkb->lkb_ownpid);
3572 ms->m_lkid = cpu_to_le32(lkb->lkb_id);
3573 ms->m_remid = cpu_to_le32(lkb->lkb_remid);
3574 ms->m_exflags = cpu_to_le32(lkb->lkb_exflags);
3575 ms->m_sbflags = cpu_to_le32(dlm_sbflags_val(lkb));
3576 ms->m_flags = cpu_to_le32(dlm_dflags_val(lkb));
3577 ms->m_lvbseq = cpu_to_le32(lkb->lkb_lvbseq);
3578 ms->m_status = cpu_to_le32(lkb->lkb_status);
3579 ms->m_grmode = cpu_to_le32(lkb->lkb_grmode);
3580 ms->m_rqmode = cpu_to_le32(lkb->lkb_rqmode);
3581 ms->m_hash = cpu_to_le32(r->res_hash);
3582
3583 /* m_result and m_bastmode are set from function args,
3584 not from lkb fields */
3585
3586 if (lkb->lkb_bastfn)
3587 ms->m_asts |= cpu_to_le32(DLM_CB_BAST);
3588 if (lkb->lkb_astfn)
3589 ms->m_asts |= cpu_to_le32(DLM_CB_CAST);
3590
3591 /* compare with switch in create_message; send_remove() doesn't
3592 use send_args() */
3593
3594 switch (ms->m_type) {
3595 case cpu_to_le32(DLM_MSG_REQUEST):
3596 case cpu_to_le32(DLM_MSG_LOOKUP):
3597 memcpy(ms->m_extra, r->res_name, r->res_length);
3598 break;
3599 case cpu_to_le32(DLM_MSG_CONVERT):
3600 case cpu_to_le32(DLM_MSG_UNLOCK):
3601 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3602 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3603 case cpu_to_le32(DLM_MSG_GRANT):
3604 if (!lkb->lkb_lvbptr || !(lkb->lkb_exflags & DLM_LKF_VALBLK))
3605 break;
3606 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
3607 break;
3608 }
3609 }
3610
send_common(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype)3611 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
3612 {
3613 struct dlm_message *ms;
3614 struct dlm_mhandle *mh;
3615 int to_nodeid, error;
3616
3617 to_nodeid = r->res_nodeid;
3618
3619 add_to_waiters(lkb, mstype, to_nodeid);
3620 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3621 if (error)
3622 goto fail;
3623
3624 send_args(r, lkb, ms);
3625
3626 error = send_message(mh, ms, r->res_name, r->res_length);
3627 if (error)
3628 goto fail;
3629 return 0;
3630
3631 fail:
3632 remove_from_waiters(lkb, msg_reply_type(mstype));
3633 return error;
3634 }
3635
send_request(struct dlm_rsb * r,struct dlm_lkb * lkb)3636 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
3637 {
3638 return send_common(r, lkb, DLM_MSG_REQUEST);
3639 }
3640
send_convert(struct dlm_rsb * r,struct dlm_lkb * lkb)3641 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
3642 {
3643 int error;
3644
3645 error = send_common(r, lkb, DLM_MSG_CONVERT);
3646
3647 /* down conversions go without a reply from the master */
3648 if (!error && down_conversion(lkb)) {
3649 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
3650 r->res_ls->ls_local_ms.m_type = cpu_to_le32(DLM_MSG_CONVERT_REPLY);
3651 r->res_ls->ls_local_ms.m_result = 0;
3652 __receive_convert_reply(r, lkb, &r->res_ls->ls_local_ms, true);
3653 }
3654
3655 return error;
3656 }
3657
3658 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
3659 MASTER_UNCERTAIN to force the next request on the rsb to confirm
3660 that the master is still correct. */
3661
send_unlock(struct dlm_rsb * r,struct dlm_lkb * lkb)3662 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
3663 {
3664 return send_common(r, lkb, DLM_MSG_UNLOCK);
3665 }
3666
send_cancel(struct dlm_rsb * r,struct dlm_lkb * lkb)3667 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3668 {
3669 return send_common(r, lkb, DLM_MSG_CANCEL);
3670 }
3671
send_grant(struct dlm_rsb * r,struct dlm_lkb * lkb)3672 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3673 {
3674 struct dlm_message *ms;
3675 struct dlm_mhandle *mh;
3676 int to_nodeid, error;
3677
3678 to_nodeid = lkb->lkb_nodeid;
3679
3680 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3681 if (error)
3682 goto out;
3683
3684 send_args(r, lkb, ms);
3685
3686 ms->m_result = 0;
3687
3688 error = send_message(mh, ms, r->res_name, r->res_length);
3689 out:
3690 return error;
3691 }
3692
send_bast(struct dlm_rsb * r,struct dlm_lkb * lkb,int mode)3693 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3694 {
3695 struct dlm_message *ms;
3696 struct dlm_mhandle *mh;
3697 int to_nodeid, error;
3698
3699 to_nodeid = lkb->lkb_nodeid;
3700
3701 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3702 if (error)
3703 goto out;
3704
3705 send_args(r, lkb, ms);
3706
3707 ms->m_bastmode = cpu_to_le32(mode);
3708
3709 error = send_message(mh, ms, r->res_name, r->res_length);
3710 out:
3711 return error;
3712 }
3713
send_lookup(struct dlm_rsb * r,struct dlm_lkb * lkb)3714 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3715 {
3716 struct dlm_message *ms;
3717 struct dlm_mhandle *mh;
3718 int to_nodeid, error;
3719
3720 to_nodeid = dlm_dir_nodeid(r);
3721
3722 add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3723 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3724 if (error)
3725 goto fail;
3726
3727 send_args(r, lkb, ms);
3728
3729 error = send_message(mh, ms, r->res_name, r->res_length);
3730 if (error)
3731 goto fail;
3732 return 0;
3733
3734 fail:
3735 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3736 return error;
3737 }
3738
send_remove(struct dlm_rsb * r)3739 static int send_remove(struct dlm_rsb *r)
3740 {
3741 struct dlm_message *ms;
3742 struct dlm_mhandle *mh;
3743 int to_nodeid, error;
3744
3745 to_nodeid = dlm_dir_nodeid(r);
3746
3747 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3748 if (error)
3749 goto out;
3750
3751 memcpy(ms->m_extra, r->res_name, r->res_length);
3752 ms->m_hash = cpu_to_le32(r->res_hash);
3753
3754 error = send_message(mh, ms, r->res_name, r->res_length);
3755 out:
3756 return error;
3757 }
3758
send_common_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int mstype,int rv)3759 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3760 int mstype, int rv)
3761 {
3762 struct dlm_message *ms;
3763 struct dlm_mhandle *mh;
3764 int to_nodeid, error;
3765
3766 to_nodeid = lkb->lkb_nodeid;
3767
3768 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3769 if (error)
3770 goto out;
3771
3772 send_args(r, lkb, ms);
3773
3774 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3775
3776 error = send_message(mh, ms, r->res_name, r->res_length);
3777 out:
3778 return error;
3779 }
3780
send_request_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3781 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3782 {
3783 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3784 }
3785
send_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3786 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3787 {
3788 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3789 }
3790
send_unlock_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3791 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3792 {
3793 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3794 }
3795
send_cancel_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,int rv)3796 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3797 {
3798 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3799 }
3800
send_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms_in,int ret_nodeid,int rv)3801 static int send_lookup_reply(struct dlm_ls *ls,
3802 const struct dlm_message *ms_in, int ret_nodeid,
3803 int rv)
3804 {
3805 struct dlm_rsb *r = &ls->ls_local_rsb;
3806 struct dlm_message *ms;
3807 struct dlm_mhandle *mh;
3808 int error, nodeid = le32_to_cpu(ms_in->m_header.h_nodeid);
3809
3810 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3811 if (error)
3812 goto out;
3813
3814 ms->m_lkid = ms_in->m_lkid;
3815 ms->m_result = cpu_to_le32(to_dlm_errno(rv));
3816 ms->m_nodeid = cpu_to_le32(ret_nodeid);
3817
3818 error = send_message(mh, ms, ms_in->m_extra, receive_extralen(ms_in));
3819 out:
3820 return error;
3821 }
3822
3823 /* which args we save from a received message depends heavily on the type
3824 of message, unlike the send side where we can safely send everything about
3825 the lkb for any type of message */
3826
receive_flags(struct dlm_lkb * lkb,const struct dlm_message * ms)3827 static void receive_flags(struct dlm_lkb *lkb, const struct dlm_message *ms)
3828 {
3829 lkb->lkb_exflags = le32_to_cpu(ms->m_exflags);
3830 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3831 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3832 }
3833
receive_flags_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)3834 static void receive_flags_reply(struct dlm_lkb *lkb,
3835 const struct dlm_message *ms,
3836 bool local)
3837 {
3838 if (local)
3839 return;
3840
3841 dlm_set_sbflags_val(lkb, le32_to_cpu(ms->m_sbflags));
3842 dlm_set_dflags_val(lkb, le32_to_cpu(ms->m_flags));
3843 }
3844
receive_extralen(const struct dlm_message * ms)3845 static int receive_extralen(const struct dlm_message *ms)
3846 {
3847 return (le16_to_cpu(ms->m_header.h_length) -
3848 sizeof(struct dlm_message));
3849 }
3850
receive_lvb(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3851 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3852 const struct dlm_message *ms)
3853 {
3854 int len;
3855
3856 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3857 if (!lkb->lkb_lvbptr)
3858 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3859 if (!lkb->lkb_lvbptr)
3860 return -ENOMEM;
3861 len = receive_extralen(ms);
3862 if (len > ls->ls_lvblen)
3863 len = ls->ls_lvblen;
3864 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3865 }
3866 return 0;
3867 }
3868
fake_bastfn(void * astparam,int mode)3869 static void fake_bastfn(void *astparam, int mode)
3870 {
3871 log_print("fake_bastfn should not be called");
3872 }
3873
fake_astfn(void * astparam)3874 static void fake_astfn(void *astparam)
3875 {
3876 log_print("fake_astfn should not be called");
3877 }
3878
receive_request_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3879 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3880 const struct dlm_message *ms)
3881 {
3882 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3883 lkb->lkb_ownpid = le32_to_cpu(ms->m_pid);
3884 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3885 lkb->lkb_grmode = DLM_LOCK_IV;
3886 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3887
3888 lkb->lkb_bastfn = (ms->m_asts & cpu_to_le32(DLM_CB_BAST)) ? &fake_bastfn : NULL;
3889 lkb->lkb_astfn = (ms->m_asts & cpu_to_le32(DLM_CB_CAST)) ? &fake_astfn : NULL;
3890
3891 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3892 /* lkb was just created so there won't be an lvb yet */
3893 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3894 if (!lkb->lkb_lvbptr)
3895 return -ENOMEM;
3896 }
3897
3898 return 0;
3899 }
3900
receive_convert_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3901 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3902 const struct dlm_message *ms)
3903 {
3904 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3905 return -EBUSY;
3906
3907 if (receive_lvb(ls, lkb, ms))
3908 return -ENOMEM;
3909
3910 lkb->lkb_rqmode = le32_to_cpu(ms->m_rqmode);
3911 lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
3912
3913 return 0;
3914 }
3915
receive_unlock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,const struct dlm_message * ms)3916 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3917 const struct dlm_message *ms)
3918 {
3919 if (receive_lvb(ls, lkb, ms))
3920 return -ENOMEM;
3921 return 0;
3922 }
3923
3924 /* We fill in the local-lkb fields with the info that send_xxxx_reply()
3925 uses to send a reply and that the remote end uses to process the reply. */
3926
setup_local_lkb(struct dlm_ls * ls,const struct dlm_message * ms)3927 static void setup_local_lkb(struct dlm_ls *ls, const struct dlm_message *ms)
3928 {
3929 struct dlm_lkb *lkb = &ls->ls_local_lkb;
3930 lkb->lkb_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3931 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
3932 }
3933
3934 /* This is called after the rsb is locked so that we can safely inspect
3935 fields in the lkb. */
3936
validate_message(struct dlm_lkb * lkb,const struct dlm_message * ms)3937 static int validate_message(struct dlm_lkb *lkb, const struct dlm_message *ms)
3938 {
3939 int from = le32_to_cpu(ms->m_header.h_nodeid);
3940 int error = 0;
3941
3942 /* currently mixing of user/kernel locks are not supported */
3943 if (ms->m_flags & cpu_to_le32(BIT(DLM_DFL_USER_BIT)) &&
3944 !test_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags)) {
3945 log_error(lkb->lkb_resource->res_ls,
3946 "got user dlm message for a kernel lock");
3947 error = -EINVAL;
3948 goto out;
3949 }
3950
3951 switch (ms->m_type) {
3952 case cpu_to_le32(DLM_MSG_CONVERT):
3953 case cpu_to_le32(DLM_MSG_UNLOCK):
3954 case cpu_to_le32(DLM_MSG_CANCEL):
3955 if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3956 error = -EINVAL;
3957 break;
3958
3959 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
3960 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
3961 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
3962 case cpu_to_le32(DLM_MSG_GRANT):
3963 case cpu_to_le32(DLM_MSG_BAST):
3964 if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3965 error = -EINVAL;
3966 break;
3967
3968 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
3969 if (!is_process_copy(lkb))
3970 error = -EINVAL;
3971 else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3972 error = -EINVAL;
3973 break;
3974
3975 default:
3976 error = -EINVAL;
3977 }
3978
3979 out:
3980 if (error)
3981 log_error(lkb->lkb_resource->res_ls,
3982 "ignore invalid message %d from %d %x %x %x %d",
3983 le32_to_cpu(ms->m_type), from, lkb->lkb_id,
3984 lkb->lkb_remid, dlm_iflags_val(lkb),
3985 lkb->lkb_nodeid);
3986 return error;
3987 }
3988
receive_request(struct dlm_ls * ls,const struct dlm_message * ms)3989 static int receive_request(struct dlm_ls *ls, const struct dlm_message *ms)
3990 {
3991 struct dlm_lkb *lkb;
3992 struct dlm_rsb *r;
3993 int from_nodeid;
3994 int error, namelen = 0;
3995
3996 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
3997
3998 error = create_lkb(ls, &lkb);
3999 if (error)
4000 goto fail;
4001
4002 receive_flags(lkb, ms);
4003 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
4004 error = receive_request_args(ls, lkb, ms);
4005 if (error) {
4006 __put_lkb(ls, lkb);
4007 goto fail;
4008 }
4009
4010 /* The dir node is the authority on whether we are the master
4011 for this rsb or not, so if the master sends us a request, we should
4012 recreate the rsb if we've destroyed it. This race happens when we
4013 send a remove message to the dir node at the same time that the dir
4014 node sends us a request for the rsb. */
4015
4016 namelen = receive_extralen(ms);
4017
4018 error = find_rsb(ls, ms->m_extra, namelen, from_nodeid,
4019 R_RECEIVE_REQUEST, &r);
4020 if (error) {
4021 __put_lkb(ls, lkb);
4022 goto fail;
4023 }
4024
4025 lock_rsb(r);
4026
4027 if (r->res_master_nodeid != dlm_our_nodeid()) {
4028 error = validate_master_nodeid(ls, r, from_nodeid);
4029 if (error) {
4030 unlock_rsb(r);
4031 put_rsb(r);
4032 __put_lkb(ls, lkb);
4033 goto fail;
4034 }
4035 }
4036
4037 attach_lkb(r, lkb);
4038 error = do_request(r, lkb);
4039 send_request_reply(r, lkb, error);
4040 do_request_effects(r, lkb, error);
4041
4042 unlock_rsb(r);
4043 put_rsb(r);
4044
4045 if (error == -EINPROGRESS)
4046 error = 0;
4047 if (error)
4048 dlm_put_lkb(lkb);
4049 return 0;
4050
4051 fail:
4052 /* TODO: instead of returning ENOTBLK, add the lkb to res_lookup
4053 and do this receive_request again from process_lookup_list once
4054 we get the lookup reply. This would avoid a many repeated
4055 ENOTBLK request failures when the lookup reply designating us
4056 as master is delayed. */
4057
4058 if (error != -ENOTBLK) {
4059 log_limit(ls, "receive_request %x from %d %d",
4060 le32_to_cpu(ms->m_lkid), from_nodeid, error);
4061 }
4062
4063 setup_local_lkb(ls, ms);
4064 send_request_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4065 return error;
4066 }
4067
receive_convert(struct dlm_ls * ls,const struct dlm_message * ms)4068 static int receive_convert(struct dlm_ls *ls, const struct dlm_message *ms)
4069 {
4070 struct dlm_lkb *lkb;
4071 struct dlm_rsb *r;
4072 int error, reply = 1;
4073
4074 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4075 if (error)
4076 goto fail;
4077
4078 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4079 log_error(ls, "receive_convert %x remid %x recover_seq %llu "
4080 "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
4081 (unsigned long long)lkb->lkb_recover_seq,
4082 le32_to_cpu(ms->m_header.h_nodeid),
4083 le32_to_cpu(ms->m_lkid));
4084 error = -ENOENT;
4085 dlm_put_lkb(lkb);
4086 goto fail;
4087 }
4088
4089 r = lkb->lkb_resource;
4090
4091 hold_rsb(r);
4092 lock_rsb(r);
4093
4094 error = validate_message(lkb, ms);
4095 if (error)
4096 goto out;
4097
4098 receive_flags(lkb, ms);
4099
4100 error = receive_convert_args(ls, lkb, ms);
4101 if (error) {
4102 send_convert_reply(r, lkb, error);
4103 goto out;
4104 }
4105
4106 reply = !down_conversion(lkb);
4107
4108 error = do_convert(r, lkb);
4109 if (reply)
4110 send_convert_reply(r, lkb, error);
4111 do_convert_effects(r, lkb, error);
4112 out:
4113 unlock_rsb(r);
4114 put_rsb(r);
4115 dlm_put_lkb(lkb);
4116 return 0;
4117
4118 fail:
4119 setup_local_lkb(ls, ms);
4120 send_convert_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4121 return error;
4122 }
4123
receive_unlock(struct dlm_ls * ls,const struct dlm_message * ms)4124 static int receive_unlock(struct dlm_ls *ls, const struct dlm_message *ms)
4125 {
4126 struct dlm_lkb *lkb;
4127 struct dlm_rsb *r;
4128 int error;
4129
4130 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4131 if (error)
4132 goto fail;
4133
4134 if (lkb->lkb_remid != le32_to_cpu(ms->m_lkid)) {
4135 log_error(ls, "receive_unlock %x remid %x remote %d %x",
4136 lkb->lkb_id, lkb->lkb_remid,
4137 le32_to_cpu(ms->m_header.h_nodeid),
4138 le32_to_cpu(ms->m_lkid));
4139 error = -ENOENT;
4140 dlm_put_lkb(lkb);
4141 goto fail;
4142 }
4143
4144 r = lkb->lkb_resource;
4145
4146 hold_rsb(r);
4147 lock_rsb(r);
4148
4149 error = validate_message(lkb, ms);
4150 if (error)
4151 goto out;
4152
4153 receive_flags(lkb, ms);
4154
4155 error = receive_unlock_args(ls, lkb, ms);
4156 if (error) {
4157 send_unlock_reply(r, lkb, error);
4158 goto out;
4159 }
4160
4161 error = do_unlock(r, lkb);
4162 send_unlock_reply(r, lkb, error);
4163 do_unlock_effects(r, lkb, error);
4164 out:
4165 unlock_rsb(r);
4166 put_rsb(r);
4167 dlm_put_lkb(lkb);
4168 return 0;
4169
4170 fail:
4171 setup_local_lkb(ls, ms);
4172 send_unlock_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4173 return error;
4174 }
4175
receive_cancel(struct dlm_ls * ls,const struct dlm_message * ms)4176 static int receive_cancel(struct dlm_ls *ls, const struct dlm_message *ms)
4177 {
4178 struct dlm_lkb *lkb;
4179 struct dlm_rsb *r;
4180 int error;
4181
4182 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4183 if (error)
4184 goto fail;
4185
4186 receive_flags(lkb, ms);
4187
4188 r = lkb->lkb_resource;
4189
4190 hold_rsb(r);
4191 lock_rsb(r);
4192
4193 error = validate_message(lkb, ms);
4194 if (error)
4195 goto out;
4196
4197 error = do_cancel(r, lkb);
4198 send_cancel_reply(r, lkb, error);
4199 do_cancel_effects(r, lkb, error);
4200 out:
4201 unlock_rsb(r);
4202 put_rsb(r);
4203 dlm_put_lkb(lkb);
4204 return 0;
4205
4206 fail:
4207 setup_local_lkb(ls, ms);
4208 send_cancel_reply(&ls->ls_local_rsb, &ls->ls_local_lkb, error);
4209 return error;
4210 }
4211
receive_grant(struct dlm_ls * ls,const struct dlm_message * ms)4212 static int receive_grant(struct dlm_ls *ls, const struct dlm_message *ms)
4213 {
4214 struct dlm_lkb *lkb;
4215 struct dlm_rsb *r;
4216 int error;
4217
4218 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4219 if (error)
4220 return error;
4221
4222 r = lkb->lkb_resource;
4223
4224 hold_rsb(r);
4225 lock_rsb(r);
4226
4227 error = validate_message(lkb, ms);
4228 if (error)
4229 goto out;
4230
4231 receive_flags_reply(lkb, ms, false);
4232 if (is_altmode(lkb))
4233 munge_altmode(lkb, ms);
4234 grant_lock_pc(r, lkb, ms);
4235 queue_cast(r, lkb, 0);
4236 out:
4237 unlock_rsb(r);
4238 put_rsb(r);
4239 dlm_put_lkb(lkb);
4240 return 0;
4241 }
4242
receive_bast(struct dlm_ls * ls,const struct dlm_message * ms)4243 static int receive_bast(struct dlm_ls *ls, const struct dlm_message *ms)
4244 {
4245 struct dlm_lkb *lkb;
4246 struct dlm_rsb *r;
4247 int error;
4248
4249 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4250 if (error)
4251 return error;
4252
4253 r = lkb->lkb_resource;
4254
4255 hold_rsb(r);
4256 lock_rsb(r);
4257
4258 error = validate_message(lkb, ms);
4259 if (error)
4260 goto out;
4261
4262 queue_bast(r, lkb, le32_to_cpu(ms->m_bastmode));
4263 lkb->lkb_highbast = le32_to_cpu(ms->m_bastmode);
4264 out:
4265 unlock_rsb(r);
4266 put_rsb(r);
4267 dlm_put_lkb(lkb);
4268 return 0;
4269 }
4270
receive_lookup(struct dlm_ls * ls,const struct dlm_message * ms)4271 static void receive_lookup(struct dlm_ls *ls, const struct dlm_message *ms)
4272 {
4273 int len, error, ret_nodeid, from_nodeid, our_nodeid;
4274
4275 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4276 our_nodeid = dlm_our_nodeid();
4277
4278 len = receive_extralen(ms);
4279
4280 error = dlm_master_lookup(ls, from_nodeid, ms->m_extra, len, 0,
4281 &ret_nodeid, NULL);
4282
4283 /* Optimization: we're master so treat lookup as a request */
4284 if (!error && ret_nodeid == our_nodeid) {
4285 receive_request(ls, ms);
4286 return;
4287 }
4288 send_lookup_reply(ls, ms, ret_nodeid, error);
4289 }
4290
receive_remove(struct dlm_ls * ls,const struct dlm_message * ms)4291 static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4292 {
4293 char name[DLM_RESNAME_MAXLEN+1];
4294 struct dlm_rsb *r;
4295 int rv, len, dir_nodeid, from_nodeid;
4296
4297 from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4298
4299 len = receive_extralen(ms);
4300
4301 if (len > DLM_RESNAME_MAXLEN) {
4302 log_error(ls, "receive_remove from %d bad len %d",
4303 from_nodeid, len);
4304 return;
4305 }
4306
4307 dir_nodeid = dlm_hash2nodeid(ls, le32_to_cpu(ms->m_hash));
4308 if (dir_nodeid != dlm_our_nodeid()) {
4309 log_error(ls, "receive_remove from %d bad nodeid %d",
4310 from_nodeid, dir_nodeid);
4311 return;
4312 }
4313
4314 /*
4315 * Look for inactive rsb, if it's there, free it.
4316 * If the rsb is active, it's being used, and we should ignore this
4317 * message. This is an expected race between the dir node sending a
4318 * request to the master node at the same time as the master node sends
4319 * a remove to the dir node. The resolution to that race is for the
4320 * dir node to ignore the remove message, and the master node to
4321 * recreate the master rsb when it gets a request from the dir node for
4322 * an rsb it doesn't have.
4323 */
4324
4325 memset(name, 0, sizeof(name));
4326 memcpy(name, ms->m_extra, len);
4327
4328 rcu_read_lock();
4329 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl, name, len, &r);
4330 if (rv) {
4331 rcu_read_unlock();
4332 /* should not happen */
4333 log_error(ls, "%s from %d not found %s", __func__,
4334 from_nodeid, name);
4335 return;
4336 }
4337
4338 write_lock_bh(&ls->ls_rsbtbl_lock);
4339 if (!rsb_flag(r, RSB_HASHED)) {
4340 rcu_read_unlock();
4341 write_unlock_bh(&ls->ls_rsbtbl_lock);
4342 /* should not happen */
4343 log_error(ls, "%s from %d got removed during removal %s",
4344 __func__, from_nodeid, name);
4345 return;
4346 }
4347 /* at this stage the rsb can only being freed here */
4348 rcu_read_unlock();
4349
4350 if (!rsb_flag(r, RSB_INACTIVE)) {
4351 if (r->res_master_nodeid != from_nodeid) {
4352 /* should not happen */
4353 log_error(ls, "receive_remove on active rsb from %d master %d",
4354 from_nodeid, r->res_master_nodeid);
4355 dlm_print_rsb(r);
4356 write_unlock_bh(&ls->ls_rsbtbl_lock);
4357 return;
4358 }
4359
4360 /* Ignore the remove message, see race comment above. */
4361
4362 log_debug(ls, "receive_remove from %d master %d first %x %s",
4363 from_nodeid, r->res_master_nodeid, r->res_first_lkid,
4364 name);
4365 write_unlock_bh(&ls->ls_rsbtbl_lock);
4366 return;
4367 }
4368
4369 if (r->res_master_nodeid != from_nodeid) {
4370 log_error(ls, "receive_remove inactive from %d master %d",
4371 from_nodeid, r->res_master_nodeid);
4372 dlm_print_rsb(r);
4373 write_unlock_bh(&ls->ls_rsbtbl_lock);
4374 return;
4375 }
4376
4377 list_del(&r->res_slow_list);
4378 rhashtable_remove_fast(&ls->ls_rsbtbl, &r->res_node,
4379 dlm_rhash_rsb_params);
4380 rsb_clear_flag(r, RSB_HASHED);
4381 write_unlock_bh(&ls->ls_rsbtbl_lock);
4382
4383 free_inactive_rsb(r);
4384 }
4385
receive_purge(struct dlm_ls * ls,const struct dlm_message * ms)4386 static void receive_purge(struct dlm_ls *ls, const struct dlm_message *ms)
4387 {
4388 do_purge(ls, le32_to_cpu(ms->m_nodeid), le32_to_cpu(ms->m_pid));
4389 }
4390
receive_request_reply(struct dlm_ls * ls,const struct dlm_message * ms)4391 static int receive_request_reply(struct dlm_ls *ls,
4392 const struct dlm_message *ms)
4393 {
4394 struct dlm_lkb *lkb;
4395 struct dlm_rsb *r;
4396 int error, mstype, result;
4397 int from_nodeid = le32_to_cpu(ms->m_header.h_nodeid);
4398
4399 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4400 if (error)
4401 return error;
4402
4403 r = lkb->lkb_resource;
4404 hold_rsb(r);
4405 lock_rsb(r);
4406
4407 error = validate_message(lkb, ms);
4408 if (error)
4409 goto out;
4410
4411 mstype = lkb->lkb_wait_type;
4412 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
4413 if (error) {
4414 log_error(ls, "receive_request_reply %x remote %d %x result %d",
4415 lkb->lkb_id, from_nodeid, le32_to_cpu(ms->m_lkid),
4416 from_dlm_errno(le32_to_cpu(ms->m_result)));
4417 dlm_dump_rsb(r);
4418 goto out;
4419 }
4420
4421 /* Optimization: the dir node was also the master, so it took our
4422 lookup as a request and sent request reply instead of lookup reply */
4423 if (mstype == DLM_MSG_LOOKUP) {
4424 r->res_master_nodeid = from_nodeid;
4425 r->res_nodeid = from_nodeid;
4426 lkb->lkb_nodeid = from_nodeid;
4427 }
4428
4429 /* this is the value returned from do_request() on the master */
4430 result = from_dlm_errno(le32_to_cpu(ms->m_result));
4431
4432 switch (result) {
4433 case -EAGAIN:
4434 /* request would block (be queued) on remote master */
4435 queue_cast(r, lkb, -EAGAIN);
4436 confirm_master(r, -EAGAIN);
4437 unhold_lkb(lkb); /* undoes create_lkb() */
4438 break;
4439
4440 case -EINPROGRESS:
4441 case 0:
4442 /* request was queued or granted on remote master */
4443 receive_flags_reply(lkb, ms, false);
4444 lkb->lkb_remid = le32_to_cpu(ms->m_lkid);
4445 if (is_altmode(lkb))
4446 munge_altmode(lkb, ms);
4447 if (result) {
4448 add_lkb(r, lkb, DLM_LKSTS_WAITING);
4449 } else {
4450 grant_lock_pc(r, lkb, ms);
4451 queue_cast(r, lkb, 0);
4452 }
4453 confirm_master(r, result);
4454 break;
4455
4456 case -EBADR:
4457 case -ENOTBLK:
4458 /* find_rsb failed to find rsb or rsb wasn't master */
4459 log_limit(ls, "receive_request_reply %x from %d %d "
4460 "master %d dir %d first %x %s", lkb->lkb_id,
4461 from_nodeid, result, r->res_master_nodeid,
4462 r->res_dir_nodeid, r->res_first_lkid, r->res_name);
4463
4464 if (r->res_dir_nodeid != dlm_our_nodeid() &&
4465 r->res_master_nodeid != dlm_our_nodeid()) {
4466 /* cause _request_lock->set_master->send_lookup */
4467 r->res_master_nodeid = 0;
4468 r->res_nodeid = -1;
4469 lkb->lkb_nodeid = -1;
4470 }
4471
4472 if (is_overlap(lkb)) {
4473 /* we'll ignore error in cancel/unlock reply */
4474 queue_cast_overlap(r, lkb);
4475 confirm_master(r, result);
4476 unhold_lkb(lkb); /* undoes create_lkb() */
4477 } else {
4478 _request_lock(r, lkb);
4479
4480 if (r->res_master_nodeid == dlm_our_nodeid())
4481 confirm_master(r, 0);
4482 }
4483 break;
4484
4485 default:
4486 log_error(ls, "receive_request_reply %x error %d",
4487 lkb->lkb_id, result);
4488 }
4489
4490 if ((result == 0 || result == -EINPROGRESS) &&
4491 test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags)) {
4492 log_debug(ls, "receive_request_reply %x result %d unlock",
4493 lkb->lkb_id, result);
4494 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4495 send_unlock(r, lkb);
4496 } else if ((result == -EINPROGRESS) &&
4497 test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
4498 &lkb->lkb_iflags)) {
4499 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
4500 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4501 send_cancel(r, lkb);
4502 } else {
4503 clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags);
4504 clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags);
4505 }
4506 out:
4507 unlock_rsb(r);
4508 put_rsb(r);
4509 dlm_put_lkb(lkb);
4510 return 0;
4511 }
4512
__receive_convert_reply(struct dlm_rsb * r,struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4513 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
4514 const struct dlm_message *ms, bool local)
4515 {
4516 /* this is the value returned from do_convert() on the master */
4517 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4518 case -EAGAIN:
4519 /* convert would block (be queued) on remote master */
4520 queue_cast(r, lkb, -EAGAIN);
4521 break;
4522
4523 case -EDEADLK:
4524 receive_flags_reply(lkb, ms, local);
4525 revert_lock_pc(r, lkb);
4526 queue_cast(r, lkb, -EDEADLK);
4527 break;
4528
4529 case -EINPROGRESS:
4530 /* convert was queued on remote master */
4531 receive_flags_reply(lkb, ms, local);
4532 if (is_demoted(lkb))
4533 munge_demoted(lkb);
4534 del_lkb(r, lkb);
4535 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
4536 break;
4537
4538 case 0:
4539 /* convert was granted on remote master */
4540 receive_flags_reply(lkb, ms, local);
4541 if (is_demoted(lkb))
4542 munge_demoted(lkb);
4543 grant_lock_pc(r, lkb, ms);
4544 queue_cast(r, lkb, 0);
4545 break;
4546
4547 default:
4548 log_error(r->res_ls, "receive_convert_reply %x remote %d %x %d",
4549 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4550 le32_to_cpu(ms->m_lkid),
4551 from_dlm_errno(le32_to_cpu(ms->m_result)));
4552 dlm_print_rsb(r);
4553 dlm_print_lkb(lkb);
4554 }
4555 }
4556
_receive_convert_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4557 static void _receive_convert_reply(struct dlm_lkb *lkb,
4558 const struct dlm_message *ms, bool local)
4559 {
4560 struct dlm_rsb *r = lkb->lkb_resource;
4561 int error;
4562
4563 hold_rsb(r);
4564 lock_rsb(r);
4565
4566 error = validate_message(lkb, ms);
4567 if (error)
4568 goto out;
4569
4570 error = remove_from_waiters_ms(lkb, ms, local);
4571 if (error)
4572 goto out;
4573
4574 __receive_convert_reply(r, lkb, ms, local);
4575 out:
4576 unlock_rsb(r);
4577 put_rsb(r);
4578 }
4579
receive_convert_reply(struct dlm_ls * ls,const struct dlm_message * ms)4580 static int receive_convert_reply(struct dlm_ls *ls,
4581 const struct dlm_message *ms)
4582 {
4583 struct dlm_lkb *lkb;
4584 int error;
4585
4586 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4587 if (error)
4588 return error;
4589
4590 _receive_convert_reply(lkb, ms, false);
4591 dlm_put_lkb(lkb);
4592 return 0;
4593 }
4594
_receive_unlock_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4595 static void _receive_unlock_reply(struct dlm_lkb *lkb,
4596 const struct dlm_message *ms, bool local)
4597 {
4598 struct dlm_rsb *r = lkb->lkb_resource;
4599 int error;
4600
4601 hold_rsb(r);
4602 lock_rsb(r);
4603
4604 error = validate_message(lkb, ms);
4605 if (error)
4606 goto out;
4607
4608 error = remove_from_waiters_ms(lkb, ms, local);
4609 if (error)
4610 goto out;
4611
4612 /* this is the value returned from do_unlock() on the master */
4613
4614 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4615 case -DLM_EUNLOCK:
4616 receive_flags_reply(lkb, ms, local);
4617 remove_lock_pc(r, lkb);
4618 queue_cast(r, lkb, -DLM_EUNLOCK);
4619 break;
4620 case -ENOENT:
4621 break;
4622 default:
4623 log_error(r->res_ls, "receive_unlock_reply %x error %d",
4624 lkb->lkb_id, from_dlm_errno(le32_to_cpu(ms->m_result)));
4625 }
4626 out:
4627 unlock_rsb(r);
4628 put_rsb(r);
4629 }
4630
receive_unlock_reply(struct dlm_ls * ls,const struct dlm_message * ms)4631 static int receive_unlock_reply(struct dlm_ls *ls,
4632 const struct dlm_message *ms)
4633 {
4634 struct dlm_lkb *lkb;
4635 int error;
4636
4637 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4638 if (error)
4639 return error;
4640
4641 _receive_unlock_reply(lkb, ms, false);
4642 dlm_put_lkb(lkb);
4643 return 0;
4644 }
4645
_receive_cancel_reply(struct dlm_lkb * lkb,const struct dlm_message * ms,bool local)4646 static void _receive_cancel_reply(struct dlm_lkb *lkb,
4647 const struct dlm_message *ms, bool local)
4648 {
4649 struct dlm_rsb *r = lkb->lkb_resource;
4650 int error;
4651
4652 hold_rsb(r);
4653 lock_rsb(r);
4654
4655 error = validate_message(lkb, ms);
4656 if (error)
4657 goto out;
4658
4659 error = remove_from_waiters_ms(lkb, ms, local);
4660 if (error)
4661 goto out;
4662
4663 /* this is the value returned from do_cancel() on the master */
4664
4665 switch (from_dlm_errno(le32_to_cpu(ms->m_result))) {
4666 case -DLM_ECANCEL:
4667 receive_flags_reply(lkb, ms, local);
4668 revert_lock_pc(r, lkb);
4669 queue_cast(r, lkb, -DLM_ECANCEL);
4670 break;
4671 case 0:
4672 break;
4673 default:
4674 log_error(r->res_ls, "receive_cancel_reply %x error %d",
4675 lkb->lkb_id,
4676 from_dlm_errno(le32_to_cpu(ms->m_result)));
4677 }
4678 out:
4679 unlock_rsb(r);
4680 put_rsb(r);
4681 }
4682
receive_cancel_reply(struct dlm_ls * ls,const struct dlm_message * ms)4683 static int receive_cancel_reply(struct dlm_ls *ls,
4684 const struct dlm_message *ms)
4685 {
4686 struct dlm_lkb *lkb;
4687 int error;
4688
4689 error = find_lkb(ls, le32_to_cpu(ms->m_remid), &lkb);
4690 if (error)
4691 return error;
4692
4693 _receive_cancel_reply(lkb, ms, false);
4694 dlm_put_lkb(lkb);
4695 return 0;
4696 }
4697
receive_lookup_reply(struct dlm_ls * ls,const struct dlm_message * ms)4698 static void receive_lookup_reply(struct dlm_ls *ls,
4699 const struct dlm_message *ms)
4700 {
4701 struct dlm_lkb *lkb;
4702 struct dlm_rsb *r;
4703 int error, ret_nodeid;
4704 int do_lookup_list = 0;
4705
4706 error = find_lkb(ls, le32_to_cpu(ms->m_lkid), &lkb);
4707 if (error) {
4708 log_error(ls, "%s no lkid %x", __func__,
4709 le32_to_cpu(ms->m_lkid));
4710 return;
4711 }
4712
4713 /* ms->m_result is the value returned by dlm_master_lookup on dir node
4714 FIXME: will a non-zero error ever be returned? */
4715
4716 r = lkb->lkb_resource;
4717 hold_rsb(r);
4718 lock_rsb(r);
4719
4720 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
4721 if (error)
4722 goto out;
4723
4724 ret_nodeid = le32_to_cpu(ms->m_nodeid);
4725
4726 /* We sometimes receive a request from the dir node for this
4727 rsb before we've received the dir node's loookup_reply for it.
4728 The request from the dir node implies we're the master, so we set
4729 ourself as master in receive_request_reply, and verify here that
4730 we are indeed the master. */
4731
4732 if (r->res_master_nodeid && (r->res_master_nodeid != ret_nodeid)) {
4733 /* This should never happen */
4734 log_error(ls, "receive_lookup_reply %x from %d ret %d "
4735 "master %d dir %d our %d first %x %s",
4736 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid),
4737 ret_nodeid, r->res_master_nodeid, r->res_dir_nodeid,
4738 dlm_our_nodeid(), r->res_first_lkid, r->res_name);
4739 }
4740
4741 if (ret_nodeid == dlm_our_nodeid()) {
4742 r->res_master_nodeid = ret_nodeid;
4743 r->res_nodeid = 0;
4744 do_lookup_list = 1;
4745 r->res_first_lkid = 0;
4746 } else if (ret_nodeid == -1) {
4747 /* the remote node doesn't believe it's the dir node */
4748 log_error(ls, "receive_lookup_reply %x from %d bad ret_nodeid",
4749 lkb->lkb_id, le32_to_cpu(ms->m_header.h_nodeid));
4750 r->res_master_nodeid = 0;
4751 r->res_nodeid = -1;
4752 lkb->lkb_nodeid = -1;
4753 } else {
4754 /* set_master() will set lkb_nodeid from r */
4755 r->res_master_nodeid = ret_nodeid;
4756 r->res_nodeid = ret_nodeid;
4757 }
4758
4759 if (is_overlap(lkb)) {
4760 log_debug(ls, "receive_lookup_reply %x unlock %x",
4761 lkb->lkb_id, dlm_iflags_val(lkb));
4762 queue_cast_overlap(r, lkb);
4763 unhold_lkb(lkb); /* undoes create_lkb() */
4764 goto out_list;
4765 }
4766
4767 _request_lock(r, lkb);
4768
4769 out_list:
4770 if (do_lookup_list)
4771 process_lookup_list(r);
4772 out:
4773 unlock_rsb(r);
4774 put_rsb(r);
4775 dlm_put_lkb(lkb);
4776 }
4777
_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4778 static void _receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4779 uint32_t saved_seq)
4780 {
4781 int error = 0, noent = 0;
4782
4783 if (WARN_ON_ONCE(!dlm_is_member(ls, le32_to_cpu(ms->m_header.h_nodeid)))) {
4784 log_limit(ls, "receive %d from non-member %d %x %x %d",
4785 le32_to_cpu(ms->m_type),
4786 le32_to_cpu(ms->m_header.h_nodeid),
4787 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4788 from_dlm_errno(le32_to_cpu(ms->m_result)));
4789 return;
4790 }
4791
4792 switch (ms->m_type) {
4793
4794 /* messages sent to a master node */
4795
4796 case cpu_to_le32(DLM_MSG_REQUEST):
4797 error = receive_request(ls, ms);
4798 break;
4799
4800 case cpu_to_le32(DLM_MSG_CONVERT):
4801 error = receive_convert(ls, ms);
4802 break;
4803
4804 case cpu_to_le32(DLM_MSG_UNLOCK):
4805 error = receive_unlock(ls, ms);
4806 break;
4807
4808 case cpu_to_le32(DLM_MSG_CANCEL):
4809 noent = 1;
4810 error = receive_cancel(ls, ms);
4811 break;
4812
4813 /* messages sent from a master node (replies to above) */
4814
4815 case cpu_to_le32(DLM_MSG_REQUEST_REPLY):
4816 error = receive_request_reply(ls, ms);
4817 break;
4818
4819 case cpu_to_le32(DLM_MSG_CONVERT_REPLY):
4820 error = receive_convert_reply(ls, ms);
4821 break;
4822
4823 case cpu_to_le32(DLM_MSG_UNLOCK_REPLY):
4824 error = receive_unlock_reply(ls, ms);
4825 break;
4826
4827 case cpu_to_le32(DLM_MSG_CANCEL_REPLY):
4828 error = receive_cancel_reply(ls, ms);
4829 break;
4830
4831 /* messages sent from a master node (only two types of async msg) */
4832
4833 case cpu_to_le32(DLM_MSG_GRANT):
4834 noent = 1;
4835 error = receive_grant(ls, ms);
4836 break;
4837
4838 case cpu_to_le32(DLM_MSG_BAST):
4839 noent = 1;
4840 error = receive_bast(ls, ms);
4841 break;
4842
4843 /* messages sent to a dir node */
4844
4845 case cpu_to_le32(DLM_MSG_LOOKUP):
4846 receive_lookup(ls, ms);
4847 break;
4848
4849 case cpu_to_le32(DLM_MSG_REMOVE):
4850 receive_remove(ls, ms);
4851 break;
4852
4853 /* messages sent from a dir node (remove has no reply) */
4854
4855 case cpu_to_le32(DLM_MSG_LOOKUP_REPLY):
4856 receive_lookup_reply(ls, ms);
4857 break;
4858
4859 /* other messages */
4860
4861 case cpu_to_le32(DLM_MSG_PURGE):
4862 receive_purge(ls, ms);
4863 break;
4864
4865 default:
4866 log_error(ls, "unknown message type %d",
4867 le32_to_cpu(ms->m_type));
4868 }
4869
4870 /*
4871 * When checking for ENOENT, we're checking the result of
4872 * find_lkb(m_remid):
4873 *
4874 * The lock id referenced in the message wasn't found. This may
4875 * happen in normal usage for the async messages and cancel, so
4876 * only use log_debug for them.
4877 *
4878 * Some errors are expected and normal.
4879 */
4880
4881 if (error == -ENOENT && noent) {
4882 log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
4883 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4884 le32_to_cpu(ms->m_header.h_nodeid),
4885 le32_to_cpu(ms->m_lkid), saved_seq);
4886 } else if (error == -ENOENT) {
4887 log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
4888 le32_to_cpu(ms->m_type), le32_to_cpu(ms->m_remid),
4889 le32_to_cpu(ms->m_header.h_nodeid),
4890 le32_to_cpu(ms->m_lkid), saved_seq);
4891
4892 if (ms->m_type == cpu_to_le32(DLM_MSG_CONVERT))
4893 dlm_dump_rsb_hash(ls, le32_to_cpu(ms->m_hash));
4894 }
4895
4896 if (error == -EINVAL) {
4897 log_error(ls, "receive %d inval from %d lkid %x remid %x "
4898 "saved_seq %u",
4899 le32_to_cpu(ms->m_type),
4900 le32_to_cpu(ms->m_header.h_nodeid),
4901 le32_to_cpu(ms->m_lkid), le32_to_cpu(ms->m_remid),
4902 saved_seq);
4903 }
4904 }
4905
4906 /* If the lockspace is in recovery mode (locking stopped), then normal
4907 messages are saved on the requestqueue for processing after recovery is
4908 done. When not in recovery mode, we wait for dlm_recoverd to drain saved
4909 messages off the requestqueue before we process new ones. This occurs right
4910 after recovery completes when we transition from saving all messages on
4911 requestqueue, to processing all the saved messages, to processing new
4912 messages as they arrive. */
4913
dlm_receive_message(struct dlm_ls * ls,const struct dlm_message * ms,int nodeid)4914 static void dlm_receive_message(struct dlm_ls *ls, const struct dlm_message *ms,
4915 int nodeid)
4916 {
4917 try_again:
4918 read_lock_bh(&ls->ls_requestqueue_lock);
4919 if (test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4920 /* If we were a member of this lockspace, left, and rejoined,
4921 other nodes may still be sending us messages from the
4922 lockspace generation before we left. */
4923 if (WARN_ON_ONCE(!ls->ls_generation)) {
4924 read_unlock_bh(&ls->ls_requestqueue_lock);
4925 log_limit(ls, "receive %d from %d ignore old gen",
4926 le32_to_cpu(ms->m_type), nodeid);
4927 return;
4928 }
4929
4930 read_unlock_bh(&ls->ls_requestqueue_lock);
4931 write_lock_bh(&ls->ls_requestqueue_lock);
4932 /* recheck because we hold writelock now */
4933 if (!test_bit(LSFL_RECV_MSG_BLOCKED, &ls->ls_flags)) {
4934 write_unlock_bh(&ls->ls_requestqueue_lock);
4935 goto try_again;
4936 }
4937
4938 dlm_add_requestqueue(ls, nodeid, ms);
4939 write_unlock_bh(&ls->ls_requestqueue_lock);
4940 } else {
4941 _receive_message(ls, ms, 0);
4942 read_unlock_bh(&ls->ls_requestqueue_lock);
4943 }
4944 }
4945
4946 /* This is called by dlm_recoverd to process messages that were saved on
4947 the requestqueue. */
4948
dlm_receive_message_saved(struct dlm_ls * ls,const struct dlm_message * ms,uint32_t saved_seq)4949 void dlm_receive_message_saved(struct dlm_ls *ls, const struct dlm_message *ms,
4950 uint32_t saved_seq)
4951 {
4952 _receive_message(ls, ms, saved_seq);
4953 }
4954
4955 /* This is called by the midcomms layer when something is received for
4956 the lockspace. It could be either a MSG (normal message sent as part of
4957 standard locking activity) or an RCOM (recovery message sent as part of
4958 lockspace recovery). */
4959
dlm_receive_buffer(const union dlm_packet * p,int nodeid)4960 void dlm_receive_buffer(const union dlm_packet *p, int nodeid)
4961 {
4962 const struct dlm_header *hd = &p->header;
4963 struct dlm_ls *ls;
4964 int type = 0;
4965
4966 switch (hd->h_cmd) {
4967 case DLM_MSG:
4968 type = le32_to_cpu(p->message.m_type);
4969 break;
4970 case DLM_RCOM:
4971 type = le32_to_cpu(p->rcom.rc_type);
4972 break;
4973 default:
4974 log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4975 return;
4976 }
4977
4978 if (le32_to_cpu(hd->h_nodeid) != nodeid) {
4979 log_print("invalid h_nodeid %d from %d lockspace %x",
4980 le32_to_cpu(hd->h_nodeid), nodeid,
4981 le32_to_cpu(hd->u.h_lockspace));
4982 return;
4983 }
4984
4985 ls = dlm_find_lockspace_global(le32_to_cpu(hd->u.h_lockspace));
4986 if (!ls) {
4987 if (dlm_config.ci_log_debug) {
4988 printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
4989 "%u from %d cmd %d type %d\n",
4990 le32_to_cpu(hd->u.h_lockspace), nodeid,
4991 hd->h_cmd, type);
4992 }
4993
4994 if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4995 dlm_send_ls_not_ready(nodeid, &p->rcom);
4996 return;
4997 }
4998
4999 /* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
5000 be inactive (in this ls) before transitioning to recovery mode */
5001
5002 read_lock_bh(&ls->ls_recv_active);
5003 if (hd->h_cmd == DLM_MSG)
5004 dlm_receive_message(ls, &p->message, nodeid);
5005 else if (hd->h_cmd == DLM_RCOM)
5006 dlm_receive_rcom(ls, &p->rcom, nodeid);
5007 else
5008 log_error(ls, "invalid h_cmd %d from %d lockspace %x",
5009 hd->h_cmd, nodeid, le32_to_cpu(hd->u.h_lockspace));
5010 read_unlock_bh(&ls->ls_recv_active);
5011
5012 dlm_put_lockspace(ls);
5013 }
5014
recover_convert_waiter(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_message * ms_local)5015 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
5016 struct dlm_message *ms_local)
5017 {
5018 if (middle_conversion(lkb) || lkb->lkb_rqmode >= lkb->lkb_grmode)
5019 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5020
5021 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
5022 conversions are async; there's no reply from the remote master */
5023 }
5024
5025 /* A waiting lkb needs recovery if the master node has failed, or
5026 the master node is changing (only when no directory is used) */
5027
waiter_needs_recovery(struct dlm_ls * ls,struct dlm_lkb * lkb,int dir_nodeid)5028 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
5029 int dir_nodeid)
5030 {
5031 if (dlm_no_directory(ls))
5032 return 1;
5033
5034 if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
5035 return 1;
5036
5037 return 0;
5038 }
5039
5040 /* Recovery for locks that are waiting for replies from nodes that are now
5041 gone. We can just complete unlocks and cancels by faking a reply from the
5042 dead node. Requests and up-conversions we flag to be resent after
5043 recovery. Down-conversions can just be completed with a fake reply like
5044 unlocks. Conversions between PR and CW need special attention. */
5045
dlm_recover_waiters_pre(struct dlm_ls * ls)5046 void dlm_recover_waiters_pre(struct dlm_ls *ls)
5047 {
5048 struct dlm_lkb *lkb, *safe;
5049 struct dlm_message *ms_local;
5050 int wait_type, local_unlock_result, local_cancel_result;
5051 int dir_nodeid;
5052
5053 ms_local = kmalloc_obj(*ms_local, GFP_KERNEL);
5054 if (!ms_local)
5055 return;
5056
5057 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
5058
5059 dir_nodeid = dlm_dir_nodeid(lkb->lkb_resource);
5060
5061 /* exclude debug messages about unlocks because there can be so
5062 many and they aren't very interesting */
5063
5064 if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
5065 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5066 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d",
5067 lkb->lkb_id,
5068 lkb->lkb_remid,
5069 lkb->lkb_wait_type,
5070 lkb->lkb_resource->res_nodeid,
5071 lkb->lkb_nodeid,
5072 lkb->lkb_wait_nodeid,
5073 dir_nodeid);
5074 }
5075
5076 /* all outstanding lookups, regardless of destination will be
5077 resent after recovery is done */
5078
5079 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
5080 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5081 continue;
5082 }
5083
5084 if (!waiter_needs_recovery(ls, lkb, dir_nodeid))
5085 continue;
5086
5087 wait_type = lkb->lkb_wait_type;
5088 local_unlock_result = -DLM_EUNLOCK;
5089 local_cancel_result = -DLM_ECANCEL;
5090
5091 /* Main reply may have been received leaving a zero wait_type,
5092 but a reply for the overlapping op may not have been
5093 received. In that case we need to fake the appropriate
5094 reply for the overlap op. */
5095
5096 if (!wait_type) {
5097 if (is_overlap_cancel(lkb)) {
5098 wait_type = DLM_MSG_CANCEL;
5099 if (lkb->lkb_grmode == DLM_LOCK_IV)
5100 local_cancel_result = 0;
5101 }
5102 if (is_overlap_unlock(lkb)) {
5103 wait_type = DLM_MSG_UNLOCK;
5104 if (lkb->lkb_grmode == DLM_LOCK_IV)
5105 local_unlock_result = -ENOENT;
5106 }
5107
5108 log_debug(ls, "rwpre overlap %x %x %d %d %d",
5109 lkb->lkb_id, dlm_iflags_val(lkb), wait_type,
5110 local_cancel_result, local_unlock_result);
5111 }
5112
5113 switch (wait_type) {
5114
5115 case DLM_MSG_REQUEST:
5116 set_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5117 break;
5118
5119 case DLM_MSG_CONVERT:
5120 recover_convert_waiter(ls, lkb, ms_local);
5121 break;
5122
5123 case DLM_MSG_UNLOCK:
5124 hold_lkb(lkb);
5125 memset(ms_local, 0, sizeof(struct dlm_message));
5126 ms_local->m_type = cpu_to_le32(DLM_MSG_UNLOCK_REPLY);
5127 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_unlock_result));
5128 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5129 _receive_unlock_reply(lkb, ms_local, true);
5130 dlm_put_lkb(lkb);
5131 break;
5132
5133 case DLM_MSG_CANCEL:
5134 hold_lkb(lkb);
5135 memset(ms_local, 0, sizeof(struct dlm_message));
5136 ms_local->m_type = cpu_to_le32(DLM_MSG_CANCEL_REPLY);
5137 ms_local->m_result = cpu_to_le32(to_dlm_errno(local_cancel_result));
5138 ms_local->m_header.h_nodeid = cpu_to_le32(lkb->lkb_nodeid);
5139 _receive_cancel_reply(lkb, ms_local, true);
5140 dlm_put_lkb(lkb);
5141 break;
5142
5143 default:
5144 log_error(ls, "invalid lkb wait_type %d %d",
5145 lkb->lkb_wait_type, wait_type);
5146 }
5147 schedule();
5148 }
5149 kfree(ms_local);
5150 }
5151
find_resend_waiter(struct dlm_ls * ls)5152 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
5153 {
5154 struct dlm_lkb *lkb = NULL, *iter;
5155
5156 spin_lock_bh(&ls->ls_waiters_lock);
5157 list_for_each_entry(iter, &ls->ls_waiters, lkb_wait_reply) {
5158 if (test_bit(DLM_IFL_RESEND_BIT, &iter->lkb_iflags)) {
5159 hold_lkb(iter);
5160 lkb = iter;
5161 break;
5162 }
5163 }
5164 spin_unlock_bh(&ls->ls_waiters_lock);
5165
5166 return lkb;
5167 }
5168
5169 /*
5170 * Forced state reset for locks that were in the middle of remote operations
5171 * when recovery happened (i.e. lkbs that were on the waiters list, waiting
5172 * for a reply from a remote operation.) The lkbs remaining on the waiters
5173 * list need to be reevaluated; some may need resending to a different node
5174 * than previously, and some may now need local handling rather than remote.
5175 *
5176 * First, the lkb state for the voided remote operation is forcibly reset,
5177 * equivalent to what remove_from_waiters() would normally do:
5178 * . lkb removed from ls_waiters list
5179 * . lkb wait_type cleared
5180 * . lkb waiters_count cleared
5181 * . lkb ref count decremented for each waiters_count (almost always 1,
5182 * but possibly 2 in case of cancel/unlock overlapping, which means
5183 * two remote replies were being expected for the lkb.)
5184 *
5185 * Second, the lkb is reprocessed like an original operation would be,
5186 * by passing it to _request_lock or _convert_lock, which will either
5187 * process the lkb operation locally, or send it to a remote node again
5188 * and put the lkb back onto the waiters list.
5189 *
5190 * When reprocessing the lkb, we may find that it's flagged for an overlapping
5191 * force-unlock or cancel, either from before recovery began, or after recovery
5192 * finished. If this is the case, the unlock/cancel is done directly, and the
5193 * original operation is not initiated again (no _request_lock/_convert_lock.)
5194 */
5195
dlm_recover_waiters_post(struct dlm_ls * ls)5196 int dlm_recover_waiters_post(struct dlm_ls *ls)
5197 {
5198 struct dlm_lkb *lkb;
5199 struct dlm_rsb *r;
5200 int error = 0, mstype, err, oc, ou;
5201
5202 while (1) {
5203 if (dlm_locking_stopped(ls)) {
5204 log_debug(ls, "recover_waiters_post aborted");
5205 error = -EINTR;
5206 break;
5207 }
5208
5209 /*
5210 * Find an lkb from the waiters list that's been affected by
5211 * recovery node changes, and needs to be reprocessed. Does
5212 * hold_lkb(), adding a refcount.
5213 */
5214 lkb = find_resend_waiter(ls);
5215 if (!lkb)
5216 break;
5217
5218 r = lkb->lkb_resource;
5219 hold_rsb(r);
5220 lock_rsb(r);
5221
5222 /*
5223 * If the lkb has been flagged for a force unlock or cancel,
5224 * then the reprocessing below will be replaced by just doing
5225 * the unlock/cancel directly.
5226 */
5227 mstype = lkb->lkb_wait_type;
5228 oc = test_and_clear_bit(DLM_IFL_OVERLAP_CANCEL_BIT,
5229 &lkb->lkb_iflags);
5230 ou = test_and_clear_bit(DLM_IFL_OVERLAP_UNLOCK_BIT,
5231 &lkb->lkb_iflags);
5232 err = 0;
5233
5234 log_debug(ls, "waiter %x remote %x msg %d r_nodeid %d "
5235 "lkb_nodeid %d wait_nodeid %d dir_nodeid %d "
5236 "overlap %d %d", lkb->lkb_id, lkb->lkb_remid, mstype,
5237 r->res_nodeid, lkb->lkb_nodeid, lkb->lkb_wait_nodeid,
5238 dlm_dir_nodeid(r), oc, ou);
5239
5240 /*
5241 * No reply to the pre-recovery operation will now be received,
5242 * so a forced equivalent of remove_from_waiters() is needed to
5243 * reset the waiters state that was in place before recovery.
5244 */
5245
5246 clear_bit(DLM_IFL_RESEND_BIT, &lkb->lkb_iflags);
5247
5248 /* Forcibly clear wait_type */
5249 lkb->lkb_wait_type = 0;
5250
5251 /*
5252 * Forcibly reset wait_count and associated refcount. The
5253 * wait_count will almost always be 1, but in case of an
5254 * overlapping unlock/cancel it could be 2: see where
5255 * add_to_waiters() finds the lkb is already on the waiters
5256 * list and does lkb_wait_count++; hold_lkb().
5257 */
5258 while (lkb->lkb_wait_count) {
5259 lkb->lkb_wait_count--;
5260 unhold_lkb(lkb);
5261 }
5262
5263 /* Forcibly remove from waiters list */
5264 spin_lock_bh(&ls->ls_waiters_lock);
5265 list_del_init(&lkb->lkb_wait_reply);
5266 spin_unlock_bh(&ls->ls_waiters_lock);
5267
5268 /*
5269 * The lkb is now clear of all prior waiters state and can be
5270 * processed locally, or sent to remote node again, or directly
5271 * cancelled/unlocked.
5272 */
5273
5274 if (oc || ou) {
5275 /* do an unlock or cancel instead of resending */
5276 switch (mstype) {
5277 case DLM_MSG_LOOKUP:
5278 case DLM_MSG_REQUEST:
5279 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
5280 -DLM_ECANCEL);
5281 unhold_lkb(lkb); /* undoes create_lkb() */
5282 break;
5283 case DLM_MSG_CONVERT:
5284 if (oc) {
5285 queue_cast(r, lkb, -DLM_ECANCEL);
5286 } else {
5287 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
5288 _unlock_lock(r, lkb);
5289 }
5290 break;
5291 default:
5292 err = 1;
5293 }
5294 } else {
5295 switch (mstype) {
5296 case DLM_MSG_LOOKUP:
5297 case DLM_MSG_REQUEST:
5298 _request_lock(r, lkb);
5299 if (r->res_nodeid != -1 && is_master(r))
5300 confirm_master(r, 0);
5301 break;
5302 case DLM_MSG_CONVERT:
5303 _convert_lock(r, lkb);
5304 break;
5305 default:
5306 err = 1;
5307 }
5308 }
5309
5310 if (err) {
5311 log_error(ls, "waiter %x msg %d r_nodeid %d "
5312 "dir_nodeid %d overlap %d %d",
5313 lkb->lkb_id, mstype, r->res_nodeid,
5314 dlm_dir_nodeid(r), oc, ou);
5315 }
5316 unlock_rsb(r);
5317 put_rsb(r);
5318 dlm_put_lkb(lkb);
5319 }
5320
5321 return error;
5322 }
5323
purge_mstcpy_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list)5324 static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
5325 struct list_head *list)
5326 {
5327 struct dlm_lkb *lkb, *safe;
5328
5329 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5330 if (!is_master_copy(lkb))
5331 continue;
5332
5333 /* don't purge lkbs we've added in recover_master_copy for
5334 the current recovery seq */
5335
5336 if (lkb->lkb_recover_seq == ls->ls_recover_seq)
5337 continue;
5338
5339 del_lkb(r, lkb);
5340
5341 /* this put should free the lkb */
5342 if (!dlm_put_lkb(lkb))
5343 log_error(ls, "purged mstcpy lkb not released");
5344 }
5345 }
5346
dlm_purge_mstcpy_locks(struct dlm_rsb * r)5347 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
5348 {
5349 struct dlm_ls *ls = r->res_ls;
5350
5351 purge_mstcpy_list(ls, r, &r->res_grantqueue);
5352 purge_mstcpy_list(ls, r, &r->res_convertqueue);
5353 purge_mstcpy_list(ls, r, &r->res_waitqueue);
5354 }
5355
purge_dead_list(struct dlm_ls * ls,struct dlm_rsb * r,struct list_head * list,int nodeid_gone,unsigned int * count)5356 static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
5357 struct list_head *list,
5358 int nodeid_gone, unsigned int *count)
5359 {
5360 struct dlm_lkb *lkb, *safe;
5361
5362 list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
5363 if (!is_master_copy(lkb))
5364 continue;
5365
5366 if ((lkb->lkb_nodeid == nodeid_gone) ||
5367 dlm_is_removed(ls, lkb->lkb_nodeid)) {
5368
5369 /* tell recover_lvb to invalidate the lvb
5370 because a node holding EX/PW failed */
5371 if ((lkb->lkb_exflags & DLM_LKF_VALBLK) &&
5372 (lkb->lkb_grmode >= DLM_LOCK_PW)) {
5373 rsb_set_flag(r, RSB_RECOVER_LVB_INVAL);
5374 }
5375
5376 del_lkb(r, lkb);
5377
5378 /* this put should free the lkb */
5379 if (!dlm_put_lkb(lkb))
5380 log_error(ls, "purged dead lkb not released");
5381
5382 rsb_set_flag(r, RSB_RECOVER_GRANT);
5383
5384 (*count)++;
5385 }
5386 }
5387 }
5388
5389 /* Get rid of locks held by nodes that are gone. */
5390
dlm_recover_purge(struct dlm_ls * ls,const struct list_head * root_list)5391 void dlm_recover_purge(struct dlm_ls *ls, const struct list_head *root_list)
5392 {
5393 struct dlm_rsb *r;
5394 struct dlm_member *memb;
5395 int nodes_count = 0;
5396 int nodeid_gone = 0;
5397 unsigned int lkb_count = 0;
5398
5399 /* cache one removed nodeid to optimize the common
5400 case of a single node removed */
5401
5402 list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
5403 nodes_count++;
5404 nodeid_gone = memb->nodeid;
5405 }
5406
5407 if (!nodes_count)
5408 return;
5409
5410 list_for_each_entry(r, root_list, res_root_list) {
5411 lock_rsb(r);
5412 if (r->res_nodeid != -1 && is_master(r)) {
5413 purge_dead_list(ls, r, &r->res_grantqueue,
5414 nodeid_gone, &lkb_count);
5415 purge_dead_list(ls, r, &r->res_convertqueue,
5416 nodeid_gone, &lkb_count);
5417 purge_dead_list(ls, r, &r->res_waitqueue,
5418 nodeid_gone, &lkb_count);
5419 }
5420 unlock_rsb(r);
5421
5422 cond_resched();
5423 }
5424
5425 if (lkb_count)
5426 log_rinfo(ls, "dlm_recover_purge %u locks for %u nodes",
5427 lkb_count, nodes_count);
5428 }
5429
find_grant_rsb(struct dlm_ls * ls)5430 static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls)
5431 {
5432 struct dlm_rsb *r;
5433
5434 read_lock_bh(&ls->ls_rsbtbl_lock);
5435 list_for_each_entry(r, &ls->ls_slow_active, res_slow_list) {
5436 if (!rsb_flag(r, RSB_RECOVER_GRANT))
5437 continue;
5438 if (!is_master(r)) {
5439 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5440 continue;
5441 }
5442 hold_rsb(r);
5443 read_unlock_bh(&ls->ls_rsbtbl_lock);
5444 return r;
5445 }
5446 read_unlock_bh(&ls->ls_rsbtbl_lock);
5447 return NULL;
5448 }
5449
5450 /*
5451 * Attempt to grant locks on resources that we are the master of.
5452 * Locks may have become grantable during recovery because locks
5453 * from departed nodes have been purged (or not rebuilt), allowing
5454 * previously blocked locks to now be granted. The subset of rsb's
5455 * we are interested in are those with lkb's on either the convert or
5456 * waiting queues.
5457 *
5458 * Simplest would be to go through each master rsb and check for non-empty
5459 * convert or waiting queues, and attempt to grant on those rsbs.
5460 * Checking the queues requires lock_rsb, though, for which we'd need
5461 * to release the rsbtbl lock. This would make iterating through all
5462 * rsb's very inefficient. So, we rely on earlier recovery routines
5463 * to set RECOVER_GRANT on any rsb's that we should attempt to grant
5464 * locks for.
5465 */
5466
dlm_recover_grant(struct dlm_ls * ls)5467 void dlm_recover_grant(struct dlm_ls *ls)
5468 {
5469 struct dlm_rsb *r;
5470 unsigned int count = 0;
5471 unsigned int rsb_count = 0;
5472 unsigned int lkb_count = 0;
5473
5474 while (1) {
5475 r = find_grant_rsb(ls);
5476 if (!r)
5477 break;
5478
5479 rsb_count++;
5480 count = 0;
5481 lock_rsb(r);
5482 /* the RECOVER_GRANT flag is checked in the grant path */
5483 grant_pending_locks(r, &count);
5484 rsb_clear_flag(r, RSB_RECOVER_GRANT);
5485 lkb_count += count;
5486 confirm_master(r, 0);
5487 unlock_rsb(r);
5488 put_rsb(r);
5489 cond_resched();
5490 }
5491
5492 if (lkb_count)
5493 log_rinfo(ls, "dlm_recover_grant %u locks on %u resources",
5494 lkb_count, rsb_count);
5495 }
5496
search_remid_list(struct list_head * head,int nodeid,uint32_t remid)5497 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
5498 uint32_t remid)
5499 {
5500 struct dlm_lkb *lkb;
5501
5502 list_for_each_entry(lkb, head, lkb_statequeue) {
5503 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
5504 return lkb;
5505 }
5506 return NULL;
5507 }
5508
search_remid(struct dlm_rsb * r,int nodeid,uint32_t remid)5509 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
5510 uint32_t remid)
5511 {
5512 struct dlm_lkb *lkb;
5513
5514 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
5515 if (lkb)
5516 return lkb;
5517 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
5518 if (lkb)
5519 return lkb;
5520 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
5521 if (lkb)
5522 return lkb;
5523 return NULL;
5524 }
5525
5526 /* needs at least dlm_rcom + rcom_lock */
receive_rcom_lock_args(struct dlm_ls * ls,struct dlm_lkb * lkb,struct dlm_rsb * r,const struct dlm_rcom * rc)5527 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
5528 struct dlm_rsb *r, const struct dlm_rcom *rc)
5529 {
5530 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5531
5532 lkb->lkb_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5533 lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
5534 lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
5535 lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
5536 dlm_set_dflags_val(lkb, le32_to_cpu(rl->rl_flags));
5537 set_bit(DLM_IFL_MSTCPY_BIT, &lkb->lkb_iflags);
5538 lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
5539 lkb->lkb_rqmode = rl->rl_rqmode;
5540 lkb->lkb_grmode = rl->rl_grmode;
5541 /* don't set lkb_status because add_lkb wants to itself */
5542
5543 lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
5544 lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
5545
5546 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
5547 int lvblen = le16_to_cpu(rc->rc_header.h_length) -
5548 sizeof(struct dlm_rcom) - sizeof(struct rcom_lock);
5549 if (lvblen > ls->ls_lvblen)
5550 return -EINVAL;
5551 lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
5552 if (!lkb->lkb_lvbptr)
5553 return -ENOMEM;
5554 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
5555 }
5556
5557 /* Conversions between PR and CW (middle modes) need special handling.
5558 The real granted mode of these converting locks cannot be determined
5559 until all locks have been rebuilt on the rsb (recover_conversion) */
5560
5561 if (rl->rl_status == DLM_LKSTS_CONVERT && middle_conversion(lkb)) {
5562 /* We may need to adjust grmode depending on other granted locks. */
5563 log_rinfo(ls, "%s %x middle convert gr %d rq %d remote %d %x",
5564 __func__, lkb->lkb_id, lkb->lkb_grmode,
5565 lkb->lkb_rqmode, lkb->lkb_nodeid, lkb->lkb_remid);
5566 rsb_set_flag(r, RSB_RECOVER_CONVERT);
5567 }
5568
5569 return 0;
5570 }
5571
5572 /* This lkb may have been recovered in a previous aborted recovery so we need
5573 to check if the rsb already has an lkb with the given remote nodeid/lkid.
5574 If so we just send back a standard reply. If not, we create a new lkb with
5575 the given values and send back our lkid. We send back our lkid by sending
5576 back the rcom_lock struct we got but with the remid field filled in. */
5577
5578 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_master_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,__le32 * rl_remid,__le32 * rl_result)5579 int dlm_recover_master_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5580 __le32 *rl_remid, __le32 *rl_result)
5581 {
5582 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5583 struct dlm_rsb *r;
5584 struct dlm_lkb *lkb;
5585 uint32_t remid = 0;
5586 int from_nodeid = le32_to_cpu(rc->rc_header.h_nodeid);
5587 int error;
5588
5589 /* init rl_remid with rcom lock rl_remid */
5590 *rl_remid = rl->rl_remid;
5591
5592 if (rl->rl_parent_lkid) {
5593 error = -EOPNOTSUPP;
5594 goto out;
5595 }
5596
5597 remid = le32_to_cpu(rl->rl_lkid);
5598
5599 /* In general we expect the rsb returned to be R_MASTER, but we don't
5600 have to require it. Recovery of masters on one node can overlap
5601 recovery of locks on another node, so one node can send us MSTCPY
5602 locks before we've made ourselves master of this rsb. We can still
5603 add new MSTCPY locks that we receive here without any harm; when
5604 we make ourselves master, dlm_recover_masters() won't touch the
5605 MSTCPY locks we've received early. */
5606
5607 error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
5608 from_nodeid, R_RECEIVE_RECOVER, &r);
5609 if (error)
5610 goto out;
5611
5612 lock_rsb(r);
5613
5614 if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
5615 log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
5616 from_nodeid, remid);
5617 error = -EBADR;
5618 goto out_unlock;
5619 }
5620
5621 lkb = search_remid(r, from_nodeid, remid);
5622 if (lkb) {
5623 error = -EEXIST;
5624 goto out_remid;
5625 }
5626
5627 error = create_lkb(ls, &lkb);
5628 if (error)
5629 goto out_unlock;
5630
5631 error = receive_rcom_lock_args(ls, lkb, r, rc);
5632 if (error) {
5633 __put_lkb(ls, lkb);
5634 goto out_unlock;
5635 }
5636
5637 attach_lkb(r, lkb);
5638 add_lkb(r, lkb, rl->rl_status);
5639 ls->ls_recover_locks_in++;
5640
5641 if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
5642 rsb_set_flag(r, RSB_RECOVER_GRANT);
5643
5644 out_remid:
5645 /* this is the new value returned to the lock holder for
5646 saving in its process-copy lkb */
5647 *rl_remid = cpu_to_le32(lkb->lkb_id);
5648
5649 lkb->lkb_recover_seq = ls->ls_recover_seq;
5650
5651 out_unlock:
5652 unlock_rsb(r);
5653 put_rsb(r);
5654 out:
5655 if (error && error != -EEXIST)
5656 log_rinfo(ls, "dlm_recover_master_copy remote %d %x error %d",
5657 from_nodeid, remid, error);
5658 *rl_result = cpu_to_le32(error);
5659 return error;
5660 }
5661
5662 /* needs at least dlm_rcom + rcom_lock */
dlm_recover_process_copy(struct dlm_ls * ls,const struct dlm_rcom * rc,uint64_t seq)5663 int dlm_recover_process_copy(struct dlm_ls *ls, const struct dlm_rcom *rc,
5664 uint64_t seq)
5665 {
5666 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
5667 struct dlm_rsb *r;
5668 struct dlm_lkb *lkb;
5669 uint32_t lkid, remid;
5670 int error, result;
5671
5672 lkid = le32_to_cpu(rl->rl_lkid);
5673 remid = le32_to_cpu(rl->rl_remid);
5674 result = le32_to_cpu(rl->rl_result);
5675
5676 error = find_lkb(ls, lkid, &lkb);
5677 if (error) {
5678 log_error(ls, "dlm_recover_process_copy no %x remote %d %x %d",
5679 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5680 result);
5681 return error;
5682 }
5683
5684 r = lkb->lkb_resource;
5685 hold_rsb(r);
5686 lock_rsb(r);
5687
5688 if (!is_process_copy(lkb)) {
5689 log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
5690 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5691 result);
5692 dlm_dump_rsb(r);
5693 unlock_rsb(r);
5694 put_rsb(r);
5695 dlm_put_lkb(lkb);
5696 return -EINVAL;
5697 }
5698
5699 switch (result) {
5700 case -EBADR:
5701 /* There's a chance the new master received our lock before
5702 dlm_recover_master_reply(), this wouldn't happen if we did
5703 a barrier between recover_masters and recover_locks. */
5704
5705 log_debug(ls, "dlm_recover_process_copy %x remote %d %x %d",
5706 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5707 result);
5708
5709 dlm_send_rcom_lock(r, lkb, seq);
5710 goto out;
5711 case -EEXIST:
5712 case 0:
5713 lkb->lkb_remid = remid;
5714 break;
5715 default:
5716 log_error(ls, "dlm_recover_process_copy %x remote %d %x %d unk",
5717 lkid, le32_to_cpu(rc->rc_header.h_nodeid), remid,
5718 result);
5719 }
5720
5721 /* an ack for dlm_recover_locks() which waits for replies from
5722 all the locks it sends to new masters */
5723 dlm_recovered_lock(r);
5724 out:
5725 unlock_rsb(r);
5726 put_rsb(r);
5727 dlm_put_lkb(lkb);
5728
5729 return 0;
5730 }
5731
dlm_user_request(struct dlm_ls * ls,struct dlm_user_args * ua,int mode,uint32_t flags,void * name,unsigned int namelen)5732 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
5733 int mode, uint32_t flags, void *name, unsigned int namelen)
5734 {
5735 struct dlm_lkb *lkb;
5736 struct dlm_args args;
5737 bool do_put = true;
5738 int error;
5739
5740 dlm_lock_recovery(ls);
5741
5742 error = create_lkb(ls, &lkb);
5743 if (error) {
5744 kfree(ua);
5745 goto out;
5746 }
5747
5748 trace_dlm_lock_start(ls, lkb, name, namelen, mode, flags);
5749
5750 if (flags & DLM_LKF_VALBLK) {
5751 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5752 if (!ua->lksb.sb_lvbptr) {
5753 kfree(ua);
5754 error = -ENOMEM;
5755 goto out_put;
5756 }
5757 }
5758 error = set_lock_args(mode, &ua->lksb, flags, namelen, fake_astfn, ua,
5759 fake_bastfn, &args);
5760 if (error) {
5761 kfree(ua->lksb.sb_lvbptr);
5762 ua->lksb.sb_lvbptr = NULL;
5763 kfree(ua);
5764 goto out_put;
5765 }
5766
5767 /* After ua is attached to lkb it will be freed by dlm_free_lkb().
5768 When DLM_DFL_USER_BIT is set, the dlm knows that this is a userspace
5769 lock and that lkb_astparam is the dlm_user_args structure. */
5770 set_bit(DLM_DFL_USER_BIT, &lkb->lkb_dflags);
5771 error = request_lock(ls, lkb, name, namelen, &args);
5772
5773 switch (error) {
5774 case 0:
5775 break;
5776 case -EINPROGRESS:
5777 error = 0;
5778 break;
5779 case -EAGAIN:
5780 error = 0;
5781 fallthrough;
5782 default:
5783 goto out_put;
5784 }
5785
5786 /* add this new lkb to the per-process list of locks */
5787 spin_lock_bh(&ua->proc->locks_spin);
5788 hold_lkb(lkb);
5789 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5790 spin_unlock_bh(&ua->proc->locks_spin);
5791 do_put = false;
5792 out_put:
5793 trace_dlm_lock_end(ls, lkb, name, namelen, mode, flags, error, false);
5794 if (do_put)
5795 __put_lkb(ls, lkb);
5796 out:
5797 dlm_unlock_recovery(ls);
5798 return error;
5799 }
5800
dlm_user_convert(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,uint32_t lkid,char * lvb_in)5801 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5802 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
5803 {
5804 struct dlm_lkb *lkb;
5805 struct dlm_args args;
5806 struct dlm_user_args *ua;
5807 int error;
5808
5809 dlm_lock_recovery(ls);
5810
5811 error = find_lkb(ls, lkid, &lkb);
5812 if (error)
5813 goto out;
5814
5815 trace_dlm_lock_start(ls, lkb, NULL, 0, mode, flags);
5816
5817 /* user can change the params on its lock when it converts it, or
5818 add an lvb that didn't exist before */
5819
5820 ua = lkb->lkb_ua;
5821
5822 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
5823 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
5824 if (!ua->lksb.sb_lvbptr) {
5825 error = -ENOMEM;
5826 goto out_put;
5827 }
5828 }
5829 if (lvb_in && ua->lksb.sb_lvbptr)
5830 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5831
5832 ua->xid = ua_tmp->xid;
5833 ua->castparam = ua_tmp->castparam;
5834 ua->castaddr = ua_tmp->castaddr;
5835 ua->bastparam = ua_tmp->bastparam;
5836 ua->bastaddr = ua_tmp->bastaddr;
5837 ua->user_lksb = ua_tmp->user_lksb;
5838
5839 error = set_lock_args(mode, &ua->lksb, flags, 0, fake_astfn, ua,
5840 fake_bastfn, &args);
5841 if (error)
5842 goto out_put;
5843
5844 error = convert_lock(ls, lkb, &args);
5845
5846 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
5847 error = 0;
5848 out_put:
5849 trace_dlm_lock_end(ls, lkb, NULL, 0, mode, flags, error, false);
5850 dlm_put_lkb(lkb);
5851 out:
5852 dlm_unlock_recovery(ls);
5853 kfree(ua_tmp);
5854 return error;
5855 }
5856
5857 /*
5858 * The caller asks for an orphan lock on a given resource with a given mode.
5859 * If a matching lock exists, it's moved to the owner's list of locks and
5860 * the lkid is returned.
5861 */
5862
dlm_user_adopt_orphan(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,int mode,uint32_t flags,void * name,unsigned int namelen,uint32_t * lkid)5863 int dlm_user_adopt_orphan(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5864 int mode, uint32_t flags, void *name, unsigned int namelen,
5865 uint32_t *lkid)
5866 {
5867 struct dlm_lkb *lkb = NULL, *iter;
5868 struct dlm_user_args *ua;
5869 int found_other_mode = 0;
5870 int rv = 0;
5871
5872 spin_lock_bh(&ls->ls_orphans_lock);
5873 list_for_each_entry(iter, &ls->ls_orphans, lkb_ownqueue) {
5874 if (iter->lkb_resource->res_length != namelen)
5875 continue;
5876 if (memcmp(iter->lkb_resource->res_name, name, namelen))
5877 continue;
5878 if (iter->lkb_grmode != mode) {
5879 found_other_mode = 1;
5880 continue;
5881 }
5882
5883 lkb = iter;
5884 list_del_init(&iter->lkb_ownqueue);
5885 clear_bit(DLM_DFL_ORPHAN_BIT, &iter->lkb_dflags);
5886 *lkid = iter->lkb_id;
5887 break;
5888 }
5889 spin_unlock_bh(&ls->ls_orphans_lock);
5890
5891 if (!lkb && found_other_mode) {
5892 rv = -EAGAIN;
5893 goto out;
5894 }
5895
5896 if (!lkb) {
5897 rv = -ENOENT;
5898 goto out;
5899 }
5900
5901 lkb->lkb_exflags = flags;
5902 lkb->lkb_ownpid = (int) current->pid;
5903
5904 ua = lkb->lkb_ua;
5905
5906 ua->proc = ua_tmp->proc;
5907 ua->xid = ua_tmp->xid;
5908 ua->castparam = ua_tmp->castparam;
5909 ua->castaddr = ua_tmp->castaddr;
5910 ua->bastparam = ua_tmp->bastparam;
5911 ua->bastaddr = ua_tmp->bastaddr;
5912 ua->user_lksb = ua_tmp->user_lksb;
5913
5914 /*
5915 * The lkb reference from the ls_orphans list was not
5916 * removed above, and is now considered the reference
5917 * for the proc locks list.
5918 */
5919
5920 spin_lock_bh(&ua->proc->locks_spin);
5921 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
5922 spin_unlock_bh(&ua->proc->locks_spin);
5923 out:
5924 kfree(ua_tmp);
5925 return rv;
5926 }
5927
dlm_user_unlock(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid,char * lvb_in)5928 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5929 uint32_t flags, uint32_t lkid, char *lvb_in)
5930 {
5931 struct dlm_lkb *lkb;
5932 struct dlm_args args;
5933 struct dlm_user_args *ua;
5934 int error;
5935
5936 dlm_lock_recovery(ls);
5937
5938 error = find_lkb(ls, lkid, &lkb);
5939 if (error)
5940 goto out;
5941
5942 trace_dlm_unlock_start(ls, lkb, flags);
5943
5944 ua = lkb->lkb_ua;
5945
5946 if (lvb_in && ua->lksb.sb_lvbptr)
5947 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
5948 if (ua_tmp->castparam)
5949 ua->castparam = ua_tmp->castparam;
5950 ua->user_lksb = ua_tmp->user_lksb;
5951
5952 error = set_unlock_args(flags, ua, &args);
5953 if (error)
5954 goto out_put;
5955
5956 error = unlock_lock(ls, lkb, &args);
5957
5958 if (error == -DLM_EUNLOCK)
5959 error = 0;
5960 /* from validate_unlock_args() */
5961 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
5962 error = 0;
5963 if (error)
5964 goto out_put;
5965
5966 spin_lock_bh(&ua->proc->locks_spin);
5967 /* dlm_user_add_cb() may have already taken lkb off the proc list */
5968 if (!list_empty(&lkb->lkb_ownqueue))
5969 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
5970 spin_unlock_bh(&ua->proc->locks_spin);
5971 out_put:
5972 trace_dlm_unlock_end(ls, lkb, flags, error);
5973 dlm_put_lkb(lkb);
5974 out:
5975 dlm_unlock_recovery(ls);
5976 kfree(ua_tmp);
5977 return error;
5978 }
5979
dlm_user_cancel(struct dlm_ls * ls,struct dlm_user_args * ua_tmp,uint32_t flags,uint32_t lkid)5980 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
5981 uint32_t flags, uint32_t lkid)
5982 {
5983 struct dlm_lkb *lkb;
5984 struct dlm_args args;
5985 struct dlm_user_args *ua;
5986 int error;
5987
5988 dlm_lock_recovery(ls);
5989
5990 error = find_lkb(ls, lkid, &lkb);
5991 if (error)
5992 goto out;
5993
5994 trace_dlm_unlock_start(ls, lkb, flags);
5995
5996 ua = lkb->lkb_ua;
5997 if (ua_tmp->castparam)
5998 ua->castparam = ua_tmp->castparam;
5999 ua->user_lksb = ua_tmp->user_lksb;
6000
6001 error = set_unlock_args(flags, ua, &args);
6002 if (error)
6003 goto out_put;
6004
6005 error = cancel_lock(ls, lkb, &args);
6006
6007 if (error == -DLM_ECANCEL)
6008 error = 0;
6009 /* from validate_unlock_args() */
6010 if (error == -EBUSY)
6011 error = 0;
6012 out_put:
6013 trace_dlm_unlock_end(ls, lkb, flags, error);
6014 dlm_put_lkb(lkb);
6015 out:
6016 dlm_unlock_recovery(ls);
6017 kfree(ua_tmp);
6018 return error;
6019 }
6020
dlm_user_deadlock(struct dlm_ls * ls,uint32_t flags,uint32_t lkid)6021 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
6022 {
6023 struct dlm_lkb *lkb;
6024 struct dlm_args args;
6025 struct dlm_user_args *ua;
6026 struct dlm_rsb *r;
6027 int error;
6028
6029 dlm_lock_recovery(ls);
6030
6031 error = find_lkb(ls, lkid, &lkb);
6032 if (error)
6033 goto out;
6034
6035 trace_dlm_unlock_start(ls, lkb, flags);
6036
6037 ua = lkb->lkb_ua;
6038
6039 error = set_unlock_args(flags, ua, &args);
6040 if (error)
6041 goto out_put;
6042
6043 /* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
6044
6045 r = lkb->lkb_resource;
6046 hold_rsb(r);
6047 lock_rsb(r);
6048
6049 error = validate_unlock_args(lkb, &args);
6050 if (error)
6051 goto out_r;
6052 set_bit(DLM_IFL_DEADLOCK_CANCEL_BIT, &lkb->lkb_iflags);
6053
6054 error = _cancel_lock(r, lkb);
6055 out_r:
6056 unlock_rsb(r);
6057 put_rsb(r);
6058
6059 if (error == -DLM_ECANCEL)
6060 error = 0;
6061 /* from validate_unlock_args() */
6062 if (error == -EBUSY)
6063 error = 0;
6064 out_put:
6065 trace_dlm_unlock_end(ls, lkb, flags, error);
6066 dlm_put_lkb(lkb);
6067 out:
6068 dlm_unlock_recovery(ls);
6069 return error;
6070 }
6071
6072 /* lkb's that are removed from the waiters list by revert are just left on the
6073 orphans list with the granted orphan locks, to be freed by purge */
6074
orphan_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6075 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6076 {
6077 struct dlm_args args;
6078 int error;
6079
6080 hold_lkb(lkb); /* reference for the ls_orphans list */
6081 spin_lock_bh(&ls->ls_orphans_lock);
6082 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
6083 spin_unlock_bh(&ls->ls_orphans_lock);
6084
6085 set_unlock_args(0, lkb->lkb_ua, &args);
6086
6087 error = cancel_lock(ls, lkb, &args);
6088 if (error == -DLM_ECANCEL)
6089 error = 0;
6090 return error;
6091 }
6092
6093 /* The FORCEUNLOCK flag allows the unlock to go ahead even if the lkb isn't
6094 granted. Regardless of what rsb queue the lock is on, it's removed and
6095 freed. The IVVALBLK flag causes the lvb on the resource to be invalidated
6096 if our lock is PW/EX (it's ignored if our granted mode is smaller.) */
6097
unlock_proc_lock(struct dlm_ls * ls,struct dlm_lkb * lkb)6098 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
6099 {
6100 struct dlm_args args;
6101 int error;
6102
6103 set_unlock_args(DLM_LKF_FORCEUNLOCK | DLM_LKF_IVVALBLK,
6104 lkb->lkb_ua, &args);
6105
6106 error = unlock_lock(ls, lkb, &args);
6107 if (error == -DLM_EUNLOCK)
6108 error = 0;
6109 return error;
6110 }
6111
6112 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
6113 (which does lock_rsb) due to deadlock with receiving a message that does
6114 lock_rsb followed by dlm_user_add_cb() */
6115
del_proc_lock(struct dlm_ls * ls,struct dlm_user_proc * proc)6116 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
6117 struct dlm_user_proc *proc)
6118 {
6119 struct dlm_lkb *lkb = NULL;
6120
6121 spin_lock_bh(&ls->ls_clear_proc_locks);
6122 if (list_empty(&proc->locks))
6123 goto out;
6124
6125 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
6126 list_del_init(&lkb->lkb_ownqueue);
6127
6128 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6129 set_bit(DLM_DFL_ORPHAN_BIT, &lkb->lkb_dflags);
6130 else
6131 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6132 out:
6133 spin_unlock_bh(&ls->ls_clear_proc_locks);
6134 return lkb;
6135 }
6136
6137 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
6138 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
6139 which we clear here. */
6140
6141 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
6142 list, and no more device_writes should add lkb's to proc->locks list; so we
6143 shouldn't need to take asts_spin or locks_spin here. this assumes that
6144 device reads/writes/closes are serialized -- FIXME: we may need to serialize
6145 them ourself. */
6146
dlm_clear_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6147 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6148 {
6149 struct dlm_callback *cb, *cb_safe;
6150 struct dlm_lkb *lkb, *safe;
6151
6152 dlm_lock_recovery(ls);
6153
6154 while (1) {
6155 lkb = del_proc_lock(ls, proc);
6156 if (!lkb)
6157 break;
6158 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
6159 orphan_proc_lock(ls, lkb);
6160 else
6161 unlock_proc_lock(ls, lkb);
6162
6163 /* this removes the reference for the proc->locks list
6164 added by dlm_user_request, it may result in the lkb
6165 being freed */
6166
6167 dlm_put_lkb(lkb);
6168 }
6169
6170 spin_lock_bh(&ls->ls_clear_proc_locks);
6171
6172 /* in-progress unlocks */
6173 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6174 list_del_init(&lkb->lkb_ownqueue);
6175 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6176 dlm_put_lkb(lkb);
6177 }
6178
6179 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6180 list_del(&cb->list);
6181 dlm_free_cb(cb);
6182 }
6183
6184 spin_unlock_bh(&ls->ls_clear_proc_locks);
6185 dlm_unlock_recovery(ls);
6186 }
6187
purge_proc_locks(struct dlm_ls * ls,struct dlm_user_proc * proc)6188 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
6189 {
6190 struct dlm_callback *cb, *cb_safe;
6191 struct dlm_lkb *lkb, *safe;
6192
6193 while (1) {
6194 lkb = NULL;
6195 spin_lock_bh(&proc->locks_spin);
6196 if (!list_empty(&proc->locks)) {
6197 lkb = list_entry(proc->locks.next, struct dlm_lkb,
6198 lkb_ownqueue);
6199 list_del_init(&lkb->lkb_ownqueue);
6200 }
6201 spin_unlock_bh(&proc->locks_spin);
6202
6203 if (!lkb)
6204 break;
6205
6206 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6207 unlock_proc_lock(ls, lkb);
6208 dlm_put_lkb(lkb); /* ref from proc->locks list */
6209 }
6210
6211 spin_lock_bh(&proc->locks_spin);
6212 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
6213 list_del_init(&lkb->lkb_ownqueue);
6214 set_bit(DLM_IFL_DEAD_BIT, &lkb->lkb_iflags);
6215 dlm_put_lkb(lkb);
6216 }
6217 spin_unlock_bh(&proc->locks_spin);
6218
6219 spin_lock_bh(&proc->asts_spin);
6220 list_for_each_entry_safe(cb, cb_safe, &proc->asts, list) {
6221 list_del(&cb->list);
6222 dlm_free_cb(cb);
6223 }
6224 spin_unlock_bh(&proc->asts_spin);
6225 }
6226
6227 /* pid of 0 means purge all orphans */
6228
do_purge(struct dlm_ls * ls,int nodeid,int pid)6229 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
6230 {
6231 struct dlm_lkb *lkb, *safe;
6232
6233 spin_lock_bh(&ls->ls_orphans_lock);
6234 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
6235 if (pid && lkb->lkb_ownpid != pid)
6236 continue;
6237 unlock_proc_lock(ls, lkb);
6238 list_del_init(&lkb->lkb_ownqueue);
6239 dlm_put_lkb(lkb);
6240 }
6241 spin_unlock_bh(&ls->ls_orphans_lock);
6242 }
6243
send_purge(struct dlm_ls * ls,int nodeid,int pid)6244 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
6245 {
6246 struct dlm_message *ms;
6247 struct dlm_mhandle *mh;
6248 int error;
6249
6250 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
6251 DLM_MSG_PURGE, &ms, &mh);
6252 if (error)
6253 return error;
6254 ms->m_nodeid = cpu_to_le32(nodeid);
6255 ms->m_pid = cpu_to_le32(pid);
6256
6257 return send_message(mh, ms, NULL, 0);
6258 }
6259
dlm_user_purge(struct dlm_ls * ls,struct dlm_user_proc * proc,int nodeid,int pid)6260 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
6261 int nodeid, int pid)
6262 {
6263 int error = 0;
6264
6265 if (nodeid && (nodeid != dlm_our_nodeid())) {
6266 error = send_purge(ls, nodeid, pid);
6267 } else {
6268 dlm_lock_recovery(ls);
6269 if (pid == current->pid)
6270 purge_proc_locks(ls, proc);
6271 else
6272 do_purge(ls, nodeid, pid);
6273 dlm_unlock_recovery(ls);
6274 }
6275 return error;
6276 }
6277
6278 /* debug functionality */
dlm_debug_add_lkb(struct dlm_ls * ls,uint32_t lkb_id,char * name,int len,int lkb_nodeid,unsigned int lkb_dflags,int lkb_status)6279 int dlm_debug_add_lkb(struct dlm_ls *ls, uint32_t lkb_id, char *name, int len,
6280 int lkb_nodeid, unsigned int lkb_dflags, int lkb_status)
6281 {
6282 struct dlm_lksb *lksb;
6283 struct dlm_lkb *lkb;
6284 struct dlm_rsb *r;
6285 int error;
6286
6287 /* we currently can't set a valid user lock */
6288 if (lkb_dflags & BIT(DLM_DFL_USER_BIT))
6289 return -EOPNOTSUPP;
6290
6291 lksb = kzalloc_obj(*lksb, GFP_NOFS);
6292 if (!lksb)
6293 return -ENOMEM;
6294
6295 error = _create_lkb(ls, &lkb, lkb_id, lkb_id + 1);
6296 if (error) {
6297 kfree(lksb);
6298 return error;
6299 }
6300
6301 dlm_set_dflags_val(lkb, lkb_dflags);
6302 lkb->lkb_nodeid = lkb_nodeid;
6303 lkb->lkb_lksb = lksb;
6304 /* user specific pointer, just don't have it NULL for kernel locks */
6305 if (~lkb_dflags & BIT(DLM_DFL_USER_BIT))
6306 lkb->lkb_astparam = (void *)0xDEADBEEF;
6307
6308 error = find_rsb(ls, name, len, 0, R_REQUEST, &r);
6309 if (error) {
6310 kfree(lksb);
6311 __put_lkb(ls, lkb);
6312 return error;
6313 }
6314
6315 lock_rsb(r);
6316 attach_lkb(r, lkb);
6317 add_lkb(r, lkb, lkb_status);
6318 unlock_rsb(r);
6319 put_rsb(r);
6320
6321 return 0;
6322 }
6323
dlm_debug_add_lkb_to_waiters(struct dlm_ls * ls,uint32_t lkb_id,int mstype,int to_nodeid)6324 int dlm_debug_add_lkb_to_waiters(struct dlm_ls *ls, uint32_t lkb_id,
6325 int mstype, int to_nodeid)
6326 {
6327 struct dlm_lkb *lkb;
6328 int error;
6329
6330 error = find_lkb(ls, lkb_id, &lkb);
6331 if (error)
6332 return error;
6333
6334 add_to_waiters(lkb, mstype, to_nodeid);
6335 dlm_put_lkb(lkb);
6336 return 0;
6337 }
6338
6339