xref: /linux/fs/dlm/lock.c (revision 2d6ffcca623a9a16df6cdfbe8250b7a5904a5f5e)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 				    struct dlm_message *ms);
88 static int receive_extralen(struct dlm_message *ms);
89 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
90 static void del_timeout(struct dlm_lkb *lkb);
91 
92 /*
93  * Lock compatibilty matrix - thanks Steve
94  * UN = Unlocked state. Not really a state, used as a flag
95  * PD = Padding. Used to make the matrix a nice power of two in size
96  * Other states are the same as the VMS DLM.
97  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
98  */
99 
100 static const int __dlm_compat_matrix[8][8] = {
101       /* UN NL CR CW PR PW EX PD */
102         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
104         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
105         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
106         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
107         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
108         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
109         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
110 };
111 
112 /*
113  * This defines the direction of transfer of LVB data.
114  * Granted mode is the row; requested mode is the column.
115  * Usage: matrix[grmode+1][rqmode+1]
116  * 1 = LVB is returned to the caller
117  * 0 = LVB is written to the resource
118  * -1 = nothing happens to the LVB
119  */
120 
121 const int dlm_lvb_operations[8][8] = {
122         /* UN   NL  CR  CW  PR  PW  EX  PD*/
123         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
124         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
125         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
126         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
127         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
128         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
129         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
131 };
132 
133 #define modes_compat(gr, rq) \
134 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
135 
136 int dlm_modes_compat(int mode1, int mode2)
137 {
138 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
139 }
140 
141 /*
142  * Compatibility matrix for conversions with QUECVT set.
143  * Granted mode is the row; requested mode is the column.
144  * Usage: matrix[grmode+1][rqmode+1]
145  */
146 
147 static const int __quecvt_compat_matrix[8][8] = {
148       /* UN NL CR CW PR PW EX PD */
149         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
150         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
151         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
152         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
153         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
154         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
155         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
156         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
157 };
158 
159 void dlm_print_lkb(struct dlm_lkb *lkb)
160 {
161 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
162 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
163 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
164 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
165 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
166 }
167 
168 static void dlm_print_rsb(struct dlm_rsb *r)
169 {
170 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
171 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
172 	       r->res_recover_locks_count, r->res_name);
173 }
174 
175 void dlm_dump_rsb(struct dlm_rsb *r)
176 {
177 	struct dlm_lkb *lkb;
178 
179 	dlm_print_rsb(r);
180 
181 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
182 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
183 	printk(KERN_ERR "rsb lookup list\n");
184 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb grant queue:\n");
187 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb convert queue:\n");
190 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 	printk(KERN_ERR "rsb wait queue:\n");
193 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
194 		dlm_print_lkb(lkb);
195 }
196 
197 /* Threads cannot use the lockspace while it's being recovered */
198 
199 static inline void dlm_lock_recovery(struct dlm_ls *ls)
200 {
201 	down_read(&ls->ls_in_recovery);
202 }
203 
204 void dlm_unlock_recovery(struct dlm_ls *ls)
205 {
206 	up_read(&ls->ls_in_recovery);
207 }
208 
209 int dlm_lock_recovery_try(struct dlm_ls *ls)
210 {
211 	return down_read_trylock(&ls->ls_in_recovery);
212 }
213 
214 static inline int can_be_queued(struct dlm_lkb *lkb)
215 {
216 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
217 }
218 
219 static inline int force_blocking_asts(struct dlm_lkb *lkb)
220 {
221 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
222 }
223 
224 static inline int is_demoted(struct dlm_lkb *lkb)
225 {
226 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
227 }
228 
229 static inline int is_altmode(struct dlm_lkb *lkb)
230 {
231 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
232 }
233 
234 static inline int is_granted(struct dlm_lkb *lkb)
235 {
236 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
237 }
238 
239 static inline int is_remote(struct dlm_rsb *r)
240 {
241 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
242 	return !!r->res_nodeid;
243 }
244 
245 static inline int is_process_copy(struct dlm_lkb *lkb)
246 {
247 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
248 }
249 
250 static inline int is_master_copy(struct dlm_lkb *lkb)
251 {
252 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
253 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
254 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
255 }
256 
257 static inline int middle_conversion(struct dlm_lkb *lkb)
258 {
259 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
260 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
261 		return 1;
262 	return 0;
263 }
264 
265 static inline int down_conversion(struct dlm_lkb *lkb)
266 {
267 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
268 }
269 
270 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
271 {
272 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
273 }
274 
275 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
276 {
277 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
278 }
279 
280 static inline int is_overlap(struct dlm_lkb *lkb)
281 {
282 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
283 				  DLM_IFL_OVERLAP_CANCEL));
284 }
285 
286 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
287 {
288 	if (is_master_copy(lkb))
289 		return;
290 
291 	del_timeout(lkb);
292 
293 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
294 
295 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
296 	   timeout caused the cancel then return -ETIMEDOUT */
297 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
298 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
299 		rv = -ETIMEDOUT;
300 	}
301 
302 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
303 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
304 		rv = -EDEADLK;
305 	}
306 
307 	lkb->lkb_lksb->sb_status = rv;
308 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
309 
310 	dlm_add_ast(lkb, AST_COMP);
311 }
312 
313 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
314 {
315 	queue_cast(r, lkb,
316 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
317 }
318 
319 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
320 {
321 	if (is_master_copy(lkb))
322 		send_bast(r, lkb, rqmode);
323 	else {
324 		lkb->lkb_bastmode = rqmode;
325 		dlm_add_ast(lkb, AST_BAST);
326 	}
327 }
328 
329 /*
330  * Basic operations on rsb's and lkb's
331  */
332 
333 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
334 {
335 	struct dlm_rsb *r;
336 
337 	r = dlm_allocate_rsb(ls, len);
338 	if (!r)
339 		return NULL;
340 
341 	r->res_ls = ls;
342 	r->res_length = len;
343 	memcpy(r->res_name, name, len);
344 	mutex_init(&r->res_mutex);
345 
346 	INIT_LIST_HEAD(&r->res_lookup);
347 	INIT_LIST_HEAD(&r->res_grantqueue);
348 	INIT_LIST_HEAD(&r->res_convertqueue);
349 	INIT_LIST_HEAD(&r->res_waitqueue);
350 	INIT_LIST_HEAD(&r->res_root_list);
351 	INIT_LIST_HEAD(&r->res_recover_list);
352 
353 	return r;
354 }
355 
356 static int search_rsb_list(struct list_head *head, char *name, int len,
357 			   unsigned int flags, struct dlm_rsb **r_ret)
358 {
359 	struct dlm_rsb *r;
360 	int error = 0;
361 
362 	list_for_each_entry(r, head, res_hashchain) {
363 		if (len == r->res_length && !memcmp(name, r->res_name, len))
364 			goto found;
365 	}
366 	return -EBADR;
367 
368  found:
369 	if (r->res_nodeid && (flags & R_MASTER))
370 		error = -ENOTBLK;
371 	*r_ret = r;
372 	return error;
373 }
374 
375 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
376 		       unsigned int flags, struct dlm_rsb **r_ret)
377 {
378 	struct dlm_rsb *r;
379 	int error;
380 
381 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
382 	if (!error) {
383 		kref_get(&r->res_ref);
384 		goto out;
385 	}
386 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
387 	if (error)
388 		goto out;
389 
390 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
391 
392 	if (dlm_no_directory(ls))
393 		goto out;
394 
395 	if (r->res_nodeid == -1) {
396 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
397 		r->res_first_lkid = 0;
398 	} else if (r->res_nodeid > 0) {
399 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
400 		r->res_first_lkid = 0;
401 	} else {
402 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
403 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
404 	}
405  out:
406 	*r_ret = r;
407 	return error;
408 }
409 
410 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
411 		      unsigned int flags, struct dlm_rsb **r_ret)
412 {
413 	int error;
414 	write_lock(&ls->ls_rsbtbl[b].lock);
415 	error = _search_rsb(ls, name, len, b, flags, r_ret);
416 	write_unlock(&ls->ls_rsbtbl[b].lock);
417 	return error;
418 }
419 
420 /*
421  * Find rsb in rsbtbl and potentially create/add one
422  *
423  * Delaying the release of rsb's has a similar benefit to applications keeping
424  * NL locks on an rsb, but without the guarantee that the cached master value
425  * will still be valid when the rsb is reused.  Apps aren't always smart enough
426  * to keep NL locks on an rsb that they may lock again shortly; this can lead
427  * to excessive master lookups and removals if we don't delay the release.
428  *
429  * Searching for an rsb means looking through both the normal list and toss
430  * list.  When found on the toss list the rsb is moved to the normal list with
431  * ref count of 1; when found on normal list the ref count is incremented.
432  */
433 
434 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
435 		    unsigned int flags, struct dlm_rsb **r_ret)
436 {
437 	struct dlm_rsb *r, *tmp;
438 	uint32_t hash, bucket;
439 	int error = -EINVAL;
440 
441 	if (namelen > DLM_RESNAME_MAXLEN)
442 		goto out;
443 
444 	if (dlm_no_directory(ls))
445 		flags |= R_CREATE;
446 
447 	error = 0;
448 	hash = jhash(name, namelen, 0);
449 	bucket = hash & (ls->ls_rsbtbl_size - 1);
450 
451 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
452 	if (!error)
453 		goto out;
454 
455 	if (error == -EBADR && !(flags & R_CREATE))
456 		goto out;
457 
458 	/* the rsb was found but wasn't a master copy */
459 	if (error == -ENOTBLK)
460 		goto out;
461 
462 	error = -ENOMEM;
463 	r = create_rsb(ls, name, namelen);
464 	if (!r)
465 		goto out;
466 
467 	r->res_hash = hash;
468 	r->res_bucket = bucket;
469 	r->res_nodeid = -1;
470 	kref_init(&r->res_ref);
471 
472 	/* With no directory, the master can be set immediately */
473 	if (dlm_no_directory(ls)) {
474 		int nodeid = dlm_dir_nodeid(r);
475 		if (nodeid == dlm_our_nodeid())
476 			nodeid = 0;
477 		r->res_nodeid = nodeid;
478 	}
479 
480 	write_lock(&ls->ls_rsbtbl[bucket].lock);
481 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
482 	if (!error) {
483 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
484 		dlm_free_rsb(r);
485 		r = tmp;
486 		goto out;
487 	}
488 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
489 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
490 	error = 0;
491  out:
492 	*r_ret = r;
493 	return error;
494 }
495 
496 /* This is only called to add a reference when the code already holds
497    a valid reference to the rsb, so there's no need for locking. */
498 
499 static inline void hold_rsb(struct dlm_rsb *r)
500 {
501 	kref_get(&r->res_ref);
502 }
503 
504 void dlm_hold_rsb(struct dlm_rsb *r)
505 {
506 	hold_rsb(r);
507 }
508 
509 static void toss_rsb(struct kref *kref)
510 {
511 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
512 	struct dlm_ls *ls = r->res_ls;
513 
514 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
515 	kref_init(&r->res_ref);
516 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
517 	r->res_toss_time = jiffies;
518 	if (r->res_lvbptr) {
519 		dlm_free_lvb(r->res_lvbptr);
520 		r->res_lvbptr = NULL;
521 	}
522 }
523 
524 /* When all references to the rsb are gone it's transfered to
525    the tossed list for later disposal. */
526 
527 static void put_rsb(struct dlm_rsb *r)
528 {
529 	struct dlm_ls *ls = r->res_ls;
530 	uint32_t bucket = r->res_bucket;
531 
532 	write_lock(&ls->ls_rsbtbl[bucket].lock);
533 	kref_put(&r->res_ref, toss_rsb);
534 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
535 }
536 
537 void dlm_put_rsb(struct dlm_rsb *r)
538 {
539 	put_rsb(r);
540 }
541 
542 /* See comment for unhold_lkb */
543 
544 static void unhold_rsb(struct dlm_rsb *r)
545 {
546 	int rv;
547 	rv = kref_put(&r->res_ref, toss_rsb);
548 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
549 }
550 
551 static void kill_rsb(struct kref *kref)
552 {
553 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
554 
555 	/* All work is done after the return from kref_put() so we
556 	   can release the write_lock before the remove and free. */
557 
558 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
559 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
560 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
561 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
562 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
563 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
564 }
565 
566 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
567    The rsb must exist as long as any lkb's for it do. */
568 
569 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
570 {
571 	hold_rsb(r);
572 	lkb->lkb_resource = r;
573 }
574 
575 static void detach_lkb(struct dlm_lkb *lkb)
576 {
577 	if (lkb->lkb_resource) {
578 		put_rsb(lkb->lkb_resource);
579 		lkb->lkb_resource = NULL;
580 	}
581 }
582 
583 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
584 {
585 	struct dlm_lkb *lkb, *tmp;
586 	uint32_t lkid = 0;
587 	uint16_t bucket;
588 
589 	lkb = dlm_allocate_lkb(ls);
590 	if (!lkb)
591 		return -ENOMEM;
592 
593 	lkb->lkb_nodeid = -1;
594 	lkb->lkb_grmode = DLM_LOCK_IV;
595 	kref_init(&lkb->lkb_ref);
596 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
597 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
598 	INIT_LIST_HEAD(&lkb->lkb_time_list);
599 
600 	get_random_bytes(&bucket, sizeof(bucket));
601 	bucket &= (ls->ls_lkbtbl_size - 1);
602 
603 	write_lock(&ls->ls_lkbtbl[bucket].lock);
604 
605 	/* counter can roll over so we must verify lkid is not in use */
606 
607 	while (lkid == 0) {
608 		lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
609 
610 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
611 				    lkb_idtbl_list) {
612 			if (tmp->lkb_id != lkid)
613 				continue;
614 			lkid = 0;
615 			break;
616 		}
617 	}
618 
619 	lkb->lkb_id = lkid;
620 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
621 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
622 
623 	*lkb_ret = lkb;
624 	return 0;
625 }
626 
627 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
628 {
629 	struct dlm_lkb *lkb;
630 	uint16_t bucket = (lkid >> 16);
631 
632 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
633 		if (lkb->lkb_id == lkid)
634 			return lkb;
635 	}
636 	return NULL;
637 }
638 
639 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
640 {
641 	struct dlm_lkb *lkb;
642 	uint16_t bucket = (lkid >> 16);
643 
644 	if (bucket >= ls->ls_lkbtbl_size)
645 		return -EBADSLT;
646 
647 	read_lock(&ls->ls_lkbtbl[bucket].lock);
648 	lkb = __find_lkb(ls, lkid);
649 	if (lkb)
650 		kref_get(&lkb->lkb_ref);
651 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
652 
653 	*lkb_ret = lkb;
654 	return lkb ? 0 : -ENOENT;
655 }
656 
657 static void kill_lkb(struct kref *kref)
658 {
659 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
660 
661 	/* All work is done after the return from kref_put() so we
662 	   can release the write_lock before the detach_lkb */
663 
664 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
665 }
666 
667 /* __put_lkb() is used when an lkb may not have an rsb attached to
668    it so we need to provide the lockspace explicitly */
669 
670 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
671 {
672 	uint16_t bucket = (lkb->lkb_id >> 16);
673 
674 	write_lock(&ls->ls_lkbtbl[bucket].lock);
675 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
676 		list_del(&lkb->lkb_idtbl_list);
677 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
678 
679 		detach_lkb(lkb);
680 
681 		/* for local/process lkbs, lvbptr points to caller's lksb */
682 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
683 			dlm_free_lvb(lkb->lkb_lvbptr);
684 		dlm_free_lkb(lkb);
685 		return 1;
686 	} else {
687 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
688 		return 0;
689 	}
690 }
691 
692 int dlm_put_lkb(struct dlm_lkb *lkb)
693 {
694 	struct dlm_ls *ls;
695 
696 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
697 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
698 
699 	ls = lkb->lkb_resource->res_ls;
700 	return __put_lkb(ls, lkb);
701 }
702 
703 /* This is only called to add a reference when the code already holds
704    a valid reference to the lkb, so there's no need for locking. */
705 
706 static inline void hold_lkb(struct dlm_lkb *lkb)
707 {
708 	kref_get(&lkb->lkb_ref);
709 }
710 
711 /* This is called when we need to remove a reference and are certain
712    it's not the last ref.  e.g. del_lkb is always called between a
713    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
714    put_lkb would work fine, but would involve unnecessary locking */
715 
716 static inline void unhold_lkb(struct dlm_lkb *lkb)
717 {
718 	int rv;
719 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
720 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
721 }
722 
723 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
724 			    int mode)
725 {
726 	struct dlm_lkb *lkb = NULL;
727 
728 	list_for_each_entry(lkb, head, lkb_statequeue)
729 		if (lkb->lkb_rqmode < mode)
730 			break;
731 
732 	if (!lkb)
733 		list_add_tail(new, head);
734 	else
735 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
736 }
737 
738 /* add/remove lkb to rsb's grant/convert/wait queue */
739 
740 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
741 {
742 	kref_get(&lkb->lkb_ref);
743 
744 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
745 
746 	lkb->lkb_status = status;
747 
748 	switch (status) {
749 	case DLM_LKSTS_WAITING:
750 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
751 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
752 		else
753 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
754 		break;
755 	case DLM_LKSTS_GRANTED:
756 		/* convention says granted locks kept in order of grmode */
757 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
758 				lkb->lkb_grmode);
759 		break;
760 	case DLM_LKSTS_CONVERT:
761 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
762 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
763 		else
764 			list_add_tail(&lkb->lkb_statequeue,
765 				      &r->res_convertqueue);
766 		break;
767 	default:
768 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
769 	}
770 }
771 
772 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
773 {
774 	lkb->lkb_status = 0;
775 	list_del(&lkb->lkb_statequeue);
776 	unhold_lkb(lkb);
777 }
778 
779 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
780 {
781 	hold_lkb(lkb);
782 	del_lkb(r, lkb);
783 	add_lkb(r, lkb, sts);
784 	unhold_lkb(lkb);
785 }
786 
787 static int msg_reply_type(int mstype)
788 {
789 	switch (mstype) {
790 	case DLM_MSG_REQUEST:
791 		return DLM_MSG_REQUEST_REPLY;
792 	case DLM_MSG_CONVERT:
793 		return DLM_MSG_CONVERT_REPLY;
794 	case DLM_MSG_UNLOCK:
795 		return DLM_MSG_UNLOCK_REPLY;
796 	case DLM_MSG_CANCEL:
797 		return DLM_MSG_CANCEL_REPLY;
798 	case DLM_MSG_LOOKUP:
799 		return DLM_MSG_LOOKUP_REPLY;
800 	}
801 	return -1;
802 }
803 
804 /* add/remove lkb from global waiters list of lkb's waiting for
805    a reply from a remote node */
806 
807 static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
808 {
809 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
810 	int error = 0;
811 
812 	mutex_lock(&ls->ls_waiters_mutex);
813 
814 	if (is_overlap_unlock(lkb) ||
815 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
816 		error = -EINVAL;
817 		goto out;
818 	}
819 
820 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
821 		switch (mstype) {
822 		case DLM_MSG_UNLOCK:
823 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
824 			break;
825 		case DLM_MSG_CANCEL:
826 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
827 			break;
828 		default:
829 			error = -EBUSY;
830 			goto out;
831 		}
832 		lkb->lkb_wait_count++;
833 		hold_lkb(lkb);
834 
835 		log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
836 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
837 			  lkb->lkb_wait_count, lkb->lkb_flags);
838 		goto out;
839 	}
840 
841 	DLM_ASSERT(!lkb->lkb_wait_count,
842 		   dlm_print_lkb(lkb);
843 		   printk("wait_count %d\n", lkb->lkb_wait_count););
844 
845 	lkb->lkb_wait_count++;
846 	lkb->lkb_wait_type = mstype;
847 	hold_lkb(lkb);
848 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
849  out:
850 	if (error)
851 		log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
852 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
853 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
854 	mutex_unlock(&ls->ls_waiters_mutex);
855 	return error;
856 }
857 
858 /* We clear the RESEND flag because we might be taking an lkb off the waiters
859    list as part of process_requestqueue (e.g. a lookup that has an optimized
860    request reply on the requestqueue) between dlm_recover_waiters_pre() which
861    set RESEND and dlm_recover_waiters_post() */
862 
863 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
864 {
865 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
866 	int overlap_done = 0;
867 
868 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
869 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
870 		overlap_done = 1;
871 		goto out_del;
872 	}
873 
874 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
875 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
876 		overlap_done = 1;
877 		goto out_del;
878 	}
879 
880 	/* N.B. type of reply may not always correspond to type of original
881 	   msg due to lookup->request optimization, verify others? */
882 
883 	if (lkb->lkb_wait_type) {
884 		lkb->lkb_wait_type = 0;
885 		goto out_del;
886 	}
887 
888 	log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
889 		  lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
890 	return -1;
891 
892  out_del:
893 	/* the force-unlock/cancel has completed and we haven't recvd a reply
894 	   to the op that was in progress prior to the unlock/cancel; we
895 	   give up on any reply to the earlier op.  FIXME: not sure when/how
896 	   this would happen */
897 
898 	if (overlap_done && lkb->lkb_wait_type) {
899 		log_error(ls, "remove_from_waiters %x reply %d give up on %d",
900 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
901 		lkb->lkb_wait_count--;
902 		lkb->lkb_wait_type = 0;
903 	}
904 
905 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
906 
907 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
908 	lkb->lkb_wait_count--;
909 	if (!lkb->lkb_wait_count)
910 		list_del_init(&lkb->lkb_wait_reply);
911 	unhold_lkb(lkb);
912 	return 0;
913 }
914 
915 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
916 {
917 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
918 	int error;
919 
920 	mutex_lock(&ls->ls_waiters_mutex);
921 	error = _remove_from_waiters(lkb, mstype);
922 	mutex_unlock(&ls->ls_waiters_mutex);
923 	return error;
924 }
925 
926 /* Handles situations where we might be processing a "fake" or "stub" reply in
927    which we can't try to take waiters_mutex again. */
928 
929 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
930 {
931 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
932 	int error;
933 
934 	if (ms != &ls->ls_stub_ms)
935 		mutex_lock(&ls->ls_waiters_mutex);
936 	error = _remove_from_waiters(lkb, ms->m_type);
937 	if (ms != &ls->ls_stub_ms)
938 		mutex_unlock(&ls->ls_waiters_mutex);
939 	return error;
940 }
941 
942 static void dir_remove(struct dlm_rsb *r)
943 {
944 	int to_nodeid;
945 
946 	if (dlm_no_directory(r->res_ls))
947 		return;
948 
949 	to_nodeid = dlm_dir_nodeid(r);
950 	if (to_nodeid != dlm_our_nodeid())
951 		send_remove(r);
952 	else
953 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
954 				     r->res_name, r->res_length);
955 }
956 
957 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
958    found since they are in order of newest to oldest? */
959 
960 static int shrink_bucket(struct dlm_ls *ls, int b)
961 {
962 	struct dlm_rsb *r;
963 	int count = 0, found;
964 
965 	for (;;) {
966 		found = 0;
967 		write_lock(&ls->ls_rsbtbl[b].lock);
968 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
969 					    res_hashchain) {
970 			if (!time_after_eq(jiffies, r->res_toss_time +
971 					   dlm_config.ci_toss_secs * HZ))
972 				continue;
973 			found = 1;
974 			break;
975 		}
976 
977 		if (!found) {
978 			write_unlock(&ls->ls_rsbtbl[b].lock);
979 			break;
980 		}
981 
982 		if (kref_put(&r->res_ref, kill_rsb)) {
983 			list_del(&r->res_hashchain);
984 			write_unlock(&ls->ls_rsbtbl[b].lock);
985 
986 			if (is_master(r))
987 				dir_remove(r);
988 			dlm_free_rsb(r);
989 			count++;
990 		} else {
991 			write_unlock(&ls->ls_rsbtbl[b].lock);
992 			log_error(ls, "tossed rsb in use %s", r->res_name);
993 		}
994 	}
995 
996 	return count;
997 }
998 
999 void dlm_scan_rsbs(struct dlm_ls *ls)
1000 {
1001 	int i;
1002 
1003 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1004 		shrink_bucket(ls, i);
1005 		if (dlm_locking_stopped(ls))
1006 			break;
1007 		cond_resched();
1008 	}
1009 }
1010 
1011 static void add_timeout(struct dlm_lkb *lkb)
1012 {
1013 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1014 
1015 	if (is_master_copy(lkb)) {
1016 		lkb->lkb_timestamp = jiffies;
1017 		return;
1018 	}
1019 
1020 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1021 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1022 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1023 		goto add_it;
1024 	}
1025 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1026 		goto add_it;
1027 	return;
1028 
1029  add_it:
1030 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1031 	mutex_lock(&ls->ls_timeout_mutex);
1032 	hold_lkb(lkb);
1033 	lkb->lkb_timestamp = jiffies;
1034 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1035 	mutex_unlock(&ls->ls_timeout_mutex);
1036 }
1037 
1038 static void del_timeout(struct dlm_lkb *lkb)
1039 {
1040 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1041 
1042 	mutex_lock(&ls->ls_timeout_mutex);
1043 	if (!list_empty(&lkb->lkb_time_list)) {
1044 		list_del_init(&lkb->lkb_time_list);
1045 		unhold_lkb(lkb);
1046 	}
1047 	mutex_unlock(&ls->ls_timeout_mutex);
1048 }
1049 
1050 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1051    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1052    and then lock rsb because of lock ordering in add_timeout.  We may need
1053    to specify some special timeout-related bits in the lkb that are just to
1054    be accessed under the timeout_mutex. */
1055 
1056 void dlm_scan_timeout(struct dlm_ls *ls)
1057 {
1058 	struct dlm_rsb *r;
1059 	struct dlm_lkb *lkb;
1060 	int do_cancel, do_warn;
1061 
1062 	for (;;) {
1063 		if (dlm_locking_stopped(ls))
1064 			break;
1065 
1066 		do_cancel = 0;
1067 		do_warn = 0;
1068 		mutex_lock(&ls->ls_timeout_mutex);
1069 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1070 
1071 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1072 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1073 					  lkb->lkb_timeout_cs * HZ/100))
1074 				do_cancel = 1;
1075 
1076 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1077 			    time_after_eq(jiffies, lkb->lkb_timestamp +
1078 				   	   dlm_config.ci_timewarn_cs * HZ/100))
1079 				do_warn = 1;
1080 
1081 			if (!do_cancel && !do_warn)
1082 				continue;
1083 			hold_lkb(lkb);
1084 			break;
1085 		}
1086 		mutex_unlock(&ls->ls_timeout_mutex);
1087 
1088 		if (!do_cancel && !do_warn)
1089 			break;
1090 
1091 		r = lkb->lkb_resource;
1092 		hold_rsb(r);
1093 		lock_rsb(r);
1094 
1095 		if (do_warn) {
1096 			/* clear flag so we only warn once */
1097 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1098 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1099 				del_timeout(lkb);
1100 			dlm_timeout_warn(lkb);
1101 		}
1102 
1103 		if (do_cancel) {
1104 			log_debug(ls, "timeout cancel %x node %d %s",
1105 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1106 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1107 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1108 			del_timeout(lkb);
1109 			_cancel_lock(r, lkb);
1110 		}
1111 
1112 		unlock_rsb(r);
1113 		unhold_rsb(r);
1114 		dlm_put_lkb(lkb);
1115 	}
1116 }
1117 
1118 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1119    dlm_recoverd before checking/setting ls_recover_begin. */
1120 
1121 void dlm_adjust_timeouts(struct dlm_ls *ls)
1122 {
1123 	struct dlm_lkb *lkb;
1124 	long adj = jiffies - ls->ls_recover_begin;
1125 
1126 	ls->ls_recover_begin = 0;
1127 	mutex_lock(&ls->ls_timeout_mutex);
1128 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1129 		lkb->lkb_timestamp += adj;
1130 	mutex_unlock(&ls->ls_timeout_mutex);
1131 }
1132 
1133 /* lkb is master or local copy */
1134 
1135 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1136 {
1137 	int b, len = r->res_ls->ls_lvblen;
1138 
1139 	/* b=1 lvb returned to caller
1140 	   b=0 lvb written to rsb or invalidated
1141 	   b=-1 do nothing */
1142 
1143 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1144 
1145 	if (b == 1) {
1146 		if (!lkb->lkb_lvbptr)
1147 			return;
1148 
1149 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1150 			return;
1151 
1152 		if (!r->res_lvbptr)
1153 			return;
1154 
1155 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1156 		lkb->lkb_lvbseq = r->res_lvbseq;
1157 
1158 	} else if (b == 0) {
1159 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1160 			rsb_set_flag(r, RSB_VALNOTVALID);
1161 			return;
1162 		}
1163 
1164 		if (!lkb->lkb_lvbptr)
1165 			return;
1166 
1167 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1168 			return;
1169 
1170 		if (!r->res_lvbptr)
1171 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1172 
1173 		if (!r->res_lvbptr)
1174 			return;
1175 
1176 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1177 		r->res_lvbseq++;
1178 		lkb->lkb_lvbseq = r->res_lvbseq;
1179 		rsb_clear_flag(r, RSB_VALNOTVALID);
1180 	}
1181 
1182 	if (rsb_flag(r, RSB_VALNOTVALID))
1183 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1184 }
1185 
1186 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1187 {
1188 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1189 		return;
1190 
1191 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1192 		rsb_set_flag(r, RSB_VALNOTVALID);
1193 		return;
1194 	}
1195 
1196 	if (!lkb->lkb_lvbptr)
1197 		return;
1198 
1199 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1200 		return;
1201 
1202 	if (!r->res_lvbptr)
1203 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1204 
1205 	if (!r->res_lvbptr)
1206 		return;
1207 
1208 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1209 	r->res_lvbseq++;
1210 	rsb_clear_flag(r, RSB_VALNOTVALID);
1211 }
1212 
1213 /* lkb is process copy (pc) */
1214 
1215 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1216 			    struct dlm_message *ms)
1217 {
1218 	int b;
1219 
1220 	if (!lkb->lkb_lvbptr)
1221 		return;
1222 
1223 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1224 		return;
1225 
1226 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1227 	if (b == 1) {
1228 		int len = receive_extralen(ms);
1229 		if (len > DLM_RESNAME_MAXLEN)
1230 			len = DLM_RESNAME_MAXLEN;
1231 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1232 		lkb->lkb_lvbseq = ms->m_lvbseq;
1233 	}
1234 }
1235 
1236 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1237    remove_lock -- used for unlock, removes lkb from granted
1238    revert_lock -- used for cancel, moves lkb from convert to granted
1239    grant_lock  -- used for request and convert, adds lkb to granted or
1240                   moves lkb from convert or waiting to granted
1241 
1242    Each of these is used for master or local copy lkb's.  There is
1243    also a _pc() variation used to make the corresponding change on
1244    a process copy (pc) lkb. */
1245 
1246 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1247 {
1248 	del_lkb(r, lkb);
1249 	lkb->lkb_grmode = DLM_LOCK_IV;
1250 	/* this unhold undoes the original ref from create_lkb()
1251 	   so this leads to the lkb being freed */
1252 	unhold_lkb(lkb);
1253 }
1254 
1255 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1256 {
1257 	set_lvb_unlock(r, lkb);
1258 	_remove_lock(r, lkb);
1259 }
1260 
1261 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1262 {
1263 	_remove_lock(r, lkb);
1264 }
1265 
1266 /* returns: 0 did nothing
1267 	    1 moved lock to granted
1268 	   -1 removed lock */
1269 
1270 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1271 {
1272 	int rv = 0;
1273 
1274 	lkb->lkb_rqmode = DLM_LOCK_IV;
1275 
1276 	switch (lkb->lkb_status) {
1277 	case DLM_LKSTS_GRANTED:
1278 		break;
1279 	case DLM_LKSTS_CONVERT:
1280 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1281 		rv = 1;
1282 		break;
1283 	case DLM_LKSTS_WAITING:
1284 		del_lkb(r, lkb);
1285 		lkb->lkb_grmode = DLM_LOCK_IV;
1286 		/* this unhold undoes the original ref from create_lkb()
1287 		   so this leads to the lkb being freed */
1288 		unhold_lkb(lkb);
1289 		rv = -1;
1290 		break;
1291 	default:
1292 		log_print("invalid status for revert %d", lkb->lkb_status);
1293 	}
1294 	return rv;
1295 }
1296 
1297 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298 {
1299 	return revert_lock(r, lkb);
1300 }
1301 
1302 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1303 {
1304 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1305 		lkb->lkb_grmode = lkb->lkb_rqmode;
1306 		if (lkb->lkb_status)
1307 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1308 		else
1309 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1310 	}
1311 
1312 	lkb->lkb_rqmode = DLM_LOCK_IV;
1313 }
1314 
1315 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1316 {
1317 	set_lvb_lock(r, lkb);
1318 	_grant_lock(r, lkb);
1319 	lkb->lkb_highbast = 0;
1320 }
1321 
1322 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1323 			  struct dlm_message *ms)
1324 {
1325 	set_lvb_lock_pc(r, lkb, ms);
1326 	_grant_lock(r, lkb);
1327 }
1328 
1329 /* called by grant_pending_locks() which means an async grant message must
1330    be sent to the requesting node in addition to granting the lock if the
1331    lkb belongs to a remote node. */
1332 
1333 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1334 {
1335 	grant_lock(r, lkb);
1336 	if (is_master_copy(lkb))
1337 		send_grant(r, lkb);
1338 	else
1339 		queue_cast(r, lkb, 0);
1340 }
1341 
1342 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1343    change the granted/requested modes.  We're munging things accordingly in
1344    the process copy.
1345    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1346    conversion deadlock
1347    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1348    compatible with other granted locks */
1349 
1350 static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1351 {
1352 	if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1353 		log_print("munge_demoted %x invalid reply type %d",
1354 			  lkb->lkb_id, ms->m_type);
1355 		return;
1356 	}
1357 
1358 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1359 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1360 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1361 		return;
1362 	}
1363 
1364 	lkb->lkb_grmode = DLM_LOCK_NL;
1365 }
1366 
1367 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1368 {
1369 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1370 	    ms->m_type != DLM_MSG_GRANT) {
1371 		log_print("munge_altmode %x invalid reply type %d",
1372 			  lkb->lkb_id, ms->m_type);
1373 		return;
1374 	}
1375 
1376 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1377 		lkb->lkb_rqmode = DLM_LOCK_PR;
1378 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1379 		lkb->lkb_rqmode = DLM_LOCK_CW;
1380 	else {
1381 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1382 		dlm_print_lkb(lkb);
1383 	}
1384 }
1385 
1386 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1387 {
1388 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1389 					   lkb_statequeue);
1390 	if (lkb->lkb_id == first->lkb_id)
1391 		return 1;
1392 
1393 	return 0;
1394 }
1395 
1396 /* Check if the given lkb conflicts with another lkb on the queue. */
1397 
1398 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1399 {
1400 	struct dlm_lkb *this;
1401 
1402 	list_for_each_entry(this, head, lkb_statequeue) {
1403 		if (this == lkb)
1404 			continue;
1405 		if (!modes_compat(this, lkb))
1406 			return 1;
1407 	}
1408 	return 0;
1409 }
1410 
1411 /*
1412  * "A conversion deadlock arises with a pair of lock requests in the converting
1413  * queue for one resource.  The granted mode of each lock blocks the requested
1414  * mode of the other lock."
1415  *
1416  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1417  * convert queue from being granted, then deadlk/demote lkb.
1418  *
1419  * Example:
1420  * Granted Queue: empty
1421  * Convert Queue: NL->EX (first lock)
1422  *                PR->EX (second lock)
1423  *
1424  * The first lock can't be granted because of the granted mode of the second
1425  * lock and the second lock can't be granted because it's not first in the
1426  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1427  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1428  * flag set and return DEMOTED in the lksb flags.
1429  *
1430  * Originally, this function detected conv-deadlk in a more limited scope:
1431  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1432  * - if lkb1 was the first entry in the queue (not just earlier), and was
1433  *   blocked by the granted mode of lkb2, and there was nothing on the
1434  *   granted queue preventing lkb1 from being granted immediately, i.e.
1435  *   lkb2 was the only thing preventing lkb1 from being granted.
1436  *
1437  * That second condition meant we'd only say there was conv-deadlk if
1438  * resolving it (by demotion) would lead to the first lock on the convert
1439  * queue being granted right away.  It allowed conversion deadlocks to exist
1440  * between locks on the convert queue while they couldn't be granted anyway.
1441  *
1442  * Now, we detect and take action on conversion deadlocks immediately when
1443  * they're created, even if they may not be immediately consequential.  If
1444  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1445  * mode that would prevent lkb1's conversion from being granted, we do a
1446  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1447  * I think this means that the lkb_is_ahead condition below should always
1448  * be zero, i.e. there will never be conv-deadlk between two locks that are
1449  * both already on the convert queue.
1450  */
1451 
1452 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1453 {
1454 	struct dlm_lkb *lkb1;
1455 	int lkb_is_ahead = 0;
1456 
1457 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1458 		if (lkb1 == lkb2) {
1459 			lkb_is_ahead = 1;
1460 			continue;
1461 		}
1462 
1463 		if (!lkb_is_ahead) {
1464 			if (!modes_compat(lkb2, lkb1))
1465 				return 1;
1466 		} else {
1467 			if (!modes_compat(lkb2, lkb1) &&
1468 			    !modes_compat(lkb1, lkb2))
1469 				return 1;
1470 		}
1471 	}
1472 	return 0;
1473 }
1474 
1475 /*
1476  * Return 1 if the lock can be granted, 0 otherwise.
1477  * Also detect and resolve conversion deadlocks.
1478  *
1479  * lkb is the lock to be granted
1480  *
1481  * now is 1 if the function is being called in the context of the
1482  * immediate request, it is 0 if called later, after the lock has been
1483  * queued.
1484  *
1485  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1486  */
1487 
1488 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1489 {
1490 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1491 
1492 	/*
1493 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1494 	 * a new request for a NL mode lock being blocked.
1495 	 *
1496 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1497 	 * request, then it would be granted.  In essence, the use of this flag
1498 	 * tells the Lock Manager to expedite theis request by not considering
1499 	 * what may be in the CONVERTING or WAITING queues...  As of this
1500 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1501 	 * mode locks.  This flag is not valid for conversion requests.
1502 	 *
1503 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1504 	 * conversion or used with a non-NL requested mode.  We also know an
1505 	 * EXPEDITE request is always granted immediately, so now must always
1506 	 * be 1.  The full condition to grant an expedite request: (now &&
1507 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1508 	 * therefore be shortened to just checking the flag.
1509 	 */
1510 
1511 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1512 		return 1;
1513 
1514 	/*
1515 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1516 	 * added to the remaining conditions.
1517 	 */
1518 
1519 	if (queue_conflict(&r->res_grantqueue, lkb))
1520 		goto out;
1521 
1522 	/*
1523 	 * 6-3: By default, a conversion request is immediately granted if the
1524 	 * requested mode is compatible with the modes of all other granted
1525 	 * locks
1526 	 */
1527 
1528 	if (queue_conflict(&r->res_convertqueue, lkb))
1529 		goto out;
1530 
1531 	/*
1532 	 * 6-5: But the default algorithm for deciding whether to grant or
1533 	 * queue conversion requests does not by itself guarantee that such
1534 	 * requests are serviced on a "first come first serve" basis.  This, in
1535 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1536 	 *
1537 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1538 	 * the system service employed to request a lock conversion.  This flag
1539 	 * forces certain conversion requests to be queued, even if they are
1540 	 * compatible with the granted modes of other locks on the same
1541 	 * resource.  Thus, the use of this flag results in conversion requests
1542 	 * being ordered on a "first come first servce" basis.
1543 	 *
1544 	 * DCT: This condition is all about new conversions being able to occur
1545 	 * "in place" while the lock remains on the granted queue (assuming
1546 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1547 	 * doesn't _have_ to go onto the convert queue where it's processed in
1548 	 * order.  The "now" variable is necessary to distinguish converts
1549 	 * being received and processed for the first time now, because once a
1550 	 * convert is moved to the conversion queue the condition below applies
1551 	 * requiring fifo granting.
1552 	 */
1553 
1554 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1555 		return 1;
1556 
1557 	/*
1558 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1559 	 * order.
1560 	 */
1561 
1562 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1563 		return 1;
1564 
1565 	/*
1566 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1567 	 * granted until all other conversion requests ahead of it are granted
1568 	 * and/or canceled.
1569 	 */
1570 
1571 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1572 		return 1;
1573 
1574 	/*
1575 	 * 6-4: By default, a new request is immediately granted only if all
1576 	 * three of the following conditions are satisfied when the request is
1577 	 * issued:
1578 	 * - The queue of ungranted conversion requests for the resource is
1579 	 *   empty.
1580 	 * - The queue of ungranted new requests for the resource is empty.
1581 	 * - The mode of the new request is compatible with the most
1582 	 *   restrictive mode of all granted locks on the resource.
1583 	 */
1584 
1585 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1586 	    list_empty(&r->res_waitqueue))
1587 		return 1;
1588 
1589 	/*
1590 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1591 	 * it cannot be granted until the queue of ungranted conversion
1592 	 * requests is empty, all ungranted new requests ahead of it are
1593 	 * granted and/or canceled, and it is compatible with the granted mode
1594 	 * of the most restrictive lock granted on the resource.
1595 	 */
1596 
1597 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1598 	    first_in_list(lkb, &r->res_waitqueue))
1599 		return 1;
1600  out:
1601 	return 0;
1602 }
1603 
1604 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1605 			  int *err)
1606 {
1607 	int rv;
1608 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1609 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1610 
1611 	if (err)
1612 		*err = 0;
1613 
1614 	rv = _can_be_granted(r, lkb, now);
1615 	if (rv)
1616 		goto out;
1617 
1618 	/*
1619 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1620 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1621 	 * cancels one of the locks.
1622 	 */
1623 
1624 	if (is_convert && can_be_queued(lkb) &&
1625 	    conversion_deadlock_detect(r, lkb)) {
1626 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1627 			lkb->lkb_grmode = DLM_LOCK_NL;
1628 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1629 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1630 			if (err)
1631 				*err = -EDEADLK;
1632 			else {
1633 				log_print("can_be_granted deadlock %x now %d",
1634 					  lkb->lkb_id, now);
1635 				dlm_dump_rsb(r);
1636 			}
1637 		}
1638 		goto out;
1639 	}
1640 
1641 	/*
1642 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1643 	 * to grant a request in a mode other than the normal rqmode.  It's a
1644 	 * simple way to provide a big optimization to applications that can
1645 	 * use them.
1646 	 */
1647 
1648 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1649 		alt = DLM_LOCK_PR;
1650 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1651 		alt = DLM_LOCK_CW;
1652 
1653 	if (alt) {
1654 		lkb->lkb_rqmode = alt;
1655 		rv = _can_be_granted(r, lkb, now);
1656 		if (rv)
1657 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1658 		else
1659 			lkb->lkb_rqmode = rqmode;
1660 	}
1661  out:
1662 	return rv;
1663 }
1664 
1665 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1666    for locks pending on the convert list.  Once verified (watch for these
1667    log_prints), we should be able to just call _can_be_granted() and not
1668    bother with the demote/deadlk cases here (and there's no easy way to deal
1669    with a deadlk here, we'd have to generate something like grant_lock with
1670    the deadlk error.) */
1671 
1672 /* Returns the highest requested mode of all blocked conversions; sets
1673    cw if there's a blocked conversion to DLM_LOCK_CW. */
1674 
1675 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1676 {
1677 	struct dlm_lkb *lkb, *s;
1678 	int hi, demoted, quit, grant_restart, demote_restart;
1679 	int deadlk;
1680 
1681 	quit = 0;
1682  restart:
1683 	grant_restart = 0;
1684 	demote_restart = 0;
1685 	hi = DLM_LOCK_IV;
1686 
1687 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1688 		demoted = is_demoted(lkb);
1689 		deadlk = 0;
1690 
1691 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1692 			grant_lock_pending(r, lkb);
1693 			grant_restart = 1;
1694 			continue;
1695 		}
1696 
1697 		if (!demoted && is_demoted(lkb)) {
1698 			log_print("WARN: pending demoted %x node %d %s",
1699 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1700 			demote_restart = 1;
1701 			continue;
1702 		}
1703 
1704 		if (deadlk) {
1705 			log_print("WARN: pending deadlock %x node %d %s",
1706 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1707 			dlm_dump_rsb(r);
1708 			continue;
1709 		}
1710 
1711 		hi = max_t(int, lkb->lkb_rqmode, hi);
1712 
1713 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1714 			*cw = 1;
1715 	}
1716 
1717 	if (grant_restart)
1718 		goto restart;
1719 	if (demote_restart && !quit) {
1720 		quit = 1;
1721 		goto restart;
1722 	}
1723 
1724 	return max_t(int, high, hi);
1725 }
1726 
1727 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1728 {
1729 	struct dlm_lkb *lkb, *s;
1730 
1731 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1732 		if (can_be_granted(r, lkb, 0, NULL))
1733 			grant_lock_pending(r, lkb);
1734                 else {
1735 			high = max_t(int, lkb->lkb_rqmode, high);
1736 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1737 				*cw = 1;
1738 		}
1739 	}
1740 
1741 	return high;
1742 }
1743 
1744 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1745    on either the convert or waiting queue.
1746    high is the largest rqmode of all locks blocked on the convert or
1747    waiting queue. */
1748 
1749 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1750 {
1751 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1752 		if (gr->lkb_highbast < DLM_LOCK_EX)
1753 			return 1;
1754 		return 0;
1755 	}
1756 
1757 	if (gr->lkb_highbast < high &&
1758 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1759 		return 1;
1760 	return 0;
1761 }
1762 
1763 static void grant_pending_locks(struct dlm_rsb *r)
1764 {
1765 	struct dlm_lkb *lkb, *s;
1766 	int high = DLM_LOCK_IV;
1767 	int cw = 0;
1768 
1769 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1770 
1771 	high = grant_pending_convert(r, high, &cw);
1772 	high = grant_pending_wait(r, high, &cw);
1773 
1774 	if (high == DLM_LOCK_IV)
1775 		return;
1776 
1777 	/*
1778 	 * If there are locks left on the wait/convert queue then send blocking
1779 	 * ASTs to granted locks based on the largest requested mode (high)
1780 	 * found above.
1781 	 */
1782 
1783 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1784 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1785 			if (cw && high == DLM_LOCK_PR)
1786 				queue_bast(r, lkb, DLM_LOCK_CW);
1787 			else
1788 				queue_bast(r, lkb, high);
1789 			lkb->lkb_highbast = high;
1790 		}
1791 	}
1792 }
1793 
1794 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1795 {
1796 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1797 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1798 		if (gr->lkb_highbast < DLM_LOCK_EX)
1799 			return 1;
1800 		return 0;
1801 	}
1802 
1803 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1804 		return 1;
1805 	return 0;
1806 }
1807 
1808 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1809 			    struct dlm_lkb *lkb)
1810 {
1811 	struct dlm_lkb *gr;
1812 
1813 	list_for_each_entry(gr, head, lkb_statequeue) {
1814 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1815 			queue_bast(r, gr, lkb->lkb_rqmode);
1816 			gr->lkb_highbast = lkb->lkb_rqmode;
1817 		}
1818 	}
1819 }
1820 
1821 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1822 {
1823 	send_bast_queue(r, &r->res_grantqueue, lkb);
1824 }
1825 
1826 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1827 {
1828 	send_bast_queue(r, &r->res_grantqueue, lkb);
1829 	send_bast_queue(r, &r->res_convertqueue, lkb);
1830 }
1831 
1832 /* set_master(r, lkb) -- set the master nodeid of a resource
1833 
1834    The purpose of this function is to set the nodeid field in the given
1835    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1836    known, it can just be copied to the lkb and the function will return
1837    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1838    before it can be copied to the lkb.
1839 
1840    When the rsb nodeid is being looked up remotely, the initial lkb
1841    causing the lookup is kept on the ls_waiters list waiting for the
1842    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1843    on the rsb's res_lookup list until the master is verified.
1844 
1845    Return values:
1846    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1847    1: the rsb master is not available and the lkb has been placed on
1848       a wait queue
1849 */
1850 
1851 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1852 {
1853 	struct dlm_ls *ls = r->res_ls;
1854 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1855 
1856 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1857 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1858 		r->res_first_lkid = lkb->lkb_id;
1859 		lkb->lkb_nodeid = r->res_nodeid;
1860 		return 0;
1861 	}
1862 
1863 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1864 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1865 		return 1;
1866 	}
1867 
1868 	if (r->res_nodeid == 0) {
1869 		lkb->lkb_nodeid = 0;
1870 		return 0;
1871 	}
1872 
1873 	if (r->res_nodeid > 0) {
1874 		lkb->lkb_nodeid = r->res_nodeid;
1875 		return 0;
1876 	}
1877 
1878 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1879 
1880 	dir_nodeid = dlm_dir_nodeid(r);
1881 
1882 	if (dir_nodeid != our_nodeid) {
1883 		r->res_first_lkid = lkb->lkb_id;
1884 		send_lookup(r, lkb);
1885 		return 1;
1886 	}
1887 
1888 	for (i = 0; i < 2; i++) {
1889 		/* It's possible for dlm_scand to remove an old rsb for
1890 		   this same resource from the toss list, us to create
1891 		   a new one, look up the master locally, and find it
1892 		   already exists just before dlm_scand does the
1893 		   dir_remove() on the previous rsb. */
1894 
1895 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1896 				       r->res_length, &ret_nodeid);
1897 		if (!error)
1898 			break;
1899 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1900 		schedule();
1901 	}
1902 	if (error && error != -EEXIST)
1903 		return error;
1904 
1905 	if (ret_nodeid == our_nodeid) {
1906 		r->res_first_lkid = 0;
1907 		r->res_nodeid = 0;
1908 		lkb->lkb_nodeid = 0;
1909 	} else {
1910 		r->res_first_lkid = lkb->lkb_id;
1911 		r->res_nodeid = ret_nodeid;
1912 		lkb->lkb_nodeid = ret_nodeid;
1913 	}
1914 	return 0;
1915 }
1916 
1917 static void process_lookup_list(struct dlm_rsb *r)
1918 {
1919 	struct dlm_lkb *lkb, *safe;
1920 
1921 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1922 		list_del_init(&lkb->lkb_rsb_lookup);
1923 		_request_lock(r, lkb);
1924 		schedule();
1925 	}
1926 }
1927 
1928 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1929 
1930 static void confirm_master(struct dlm_rsb *r, int error)
1931 {
1932 	struct dlm_lkb *lkb;
1933 
1934 	if (!r->res_first_lkid)
1935 		return;
1936 
1937 	switch (error) {
1938 	case 0:
1939 	case -EINPROGRESS:
1940 		r->res_first_lkid = 0;
1941 		process_lookup_list(r);
1942 		break;
1943 
1944 	case -EAGAIN:
1945 	case -EBADR:
1946 	case -ENOTBLK:
1947 		/* the remote request failed and won't be retried (it was
1948 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
1949 		   lkb the first_lkid */
1950 
1951 		r->res_first_lkid = 0;
1952 
1953 		if (!list_empty(&r->res_lookup)) {
1954 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1955 					 lkb_rsb_lookup);
1956 			list_del_init(&lkb->lkb_rsb_lookup);
1957 			r->res_first_lkid = lkb->lkb_id;
1958 			_request_lock(r, lkb);
1959 		}
1960 		break;
1961 
1962 	default:
1963 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1964 	}
1965 }
1966 
1967 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1968 			 int namelen, unsigned long timeout_cs,
1969 			 void (*ast) (void *astparam),
1970 			 void *astparam,
1971 			 void (*bast) (void *astparam, int mode),
1972 			 struct dlm_args *args)
1973 {
1974 	int rv = -EINVAL;
1975 
1976 	/* check for invalid arg usage */
1977 
1978 	if (mode < 0 || mode > DLM_LOCK_EX)
1979 		goto out;
1980 
1981 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1982 		goto out;
1983 
1984 	if (flags & DLM_LKF_CANCEL)
1985 		goto out;
1986 
1987 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1988 		goto out;
1989 
1990 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1991 		goto out;
1992 
1993 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1994 		goto out;
1995 
1996 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1997 		goto out;
1998 
1999 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2000 		goto out;
2001 
2002 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2003 		goto out;
2004 
2005 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2006 		goto out;
2007 
2008 	if (!ast || !lksb)
2009 		goto out;
2010 
2011 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2012 		goto out;
2013 
2014 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2015 		goto out;
2016 
2017 	/* these args will be copied to the lkb in validate_lock_args,
2018 	   it cannot be done now because when converting locks, fields in
2019 	   an active lkb cannot be modified before locking the rsb */
2020 
2021 	args->flags = flags;
2022 	args->astfn = ast;
2023 	args->astparam = astparam;
2024 	args->bastfn = bast;
2025 	args->timeout = timeout_cs;
2026 	args->mode = mode;
2027 	args->lksb = lksb;
2028 	rv = 0;
2029  out:
2030 	return rv;
2031 }
2032 
2033 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2034 {
2035 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2036  		      DLM_LKF_FORCEUNLOCK))
2037 		return -EINVAL;
2038 
2039 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2040 		return -EINVAL;
2041 
2042 	args->flags = flags;
2043 	args->astparam = astarg;
2044 	return 0;
2045 }
2046 
2047 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2048 			      struct dlm_args *args)
2049 {
2050 	int rv = -EINVAL;
2051 
2052 	if (args->flags & DLM_LKF_CONVERT) {
2053 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2054 			goto out;
2055 
2056 		if (args->flags & DLM_LKF_QUECVT &&
2057 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2058 			goto out;
2059 
2060 		rv = -EBUSY;
2061 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2062 			goto out;
2063 
2064 		if (lkb->lkb_wait_type)
2065 			goto out;
2066 
2067 		if (is_overlap(lkb))
2068 			goto out;
2069 	}
2070 
2071 	lkb->lkb_exflags = args->flags;
2072 	lkb->lkb_sbflags = 0;
2073 	lkb->lkb_astfn = args->astfn;
2074 	lkb->lkb_astparam = args->astparam;
2075 	lkb->lkb_bastfn = args->bastfn;
2076 	lkb->lkb_rqmode = args->mode;
2077 	lkb->lkb_lksb = args->lksb;
2078 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2079 	lkb->lkb_ownpid = (int) current->pid;
2080 	lkb->lkb_timeout_cs = args->timeout;
2081 	rv = 0;
2082  out:
2083 	return rv;
2084 }
2085 
2086 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2087    for success */
2088 
2089 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2090    because there may be a lookup in progress and it's valid to do
2091    cancel/unlockf on it */
2092 
2093 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2094 {
2095 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2096 	int rv = -EINVAL;
2097 
2098 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2099 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2100 		dlm_print_lkb(lkb);
2101 		goto out;
2102 	}
2103 
2104 	/* an lkb may still exist even though the lock is EOL'ed due to a
2105 	   cancel, unlock or failed noqueue request; an app can't use these
2106 	   locks; return same error as if the lkid had not been found at all */
2107 
2108 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2109 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2110 		rv = -ENOENT;
2111 		goto out;
2112 	}
2113 
2114 	/* an lkb may be waiting for an rsb lookup to complete where the
2115 	   lookup was initiated by another lock */
2116 
2117 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2118 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2119 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2120 			list_del_init(&lkb->lkb_rsb_lookup);
2121 			queue_cast(lkb->lkb_resource, lkb,
2122 				   args->flags & DLM_LKF_CANCEL ?
2123 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2124 			unhold_lkb(lkb); /* undoes create_lkb() */
2125 		}
2126 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2127 		rv = -EBUSY;
2128 		goto out;
2129 	}
2130 
2131 	/* cancel not allowed with another cancel/unlock in progress */
2132 
2133 	if (args->flags & DLM_LKF_CANCEL) {
2134 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2135 			goto out;
2136 
2137 		if (is_overlap(lkb))
2138 			goto out;
2139 
2140 		/* don't let scand try to do a cancel */
2141 		del_timeout(lkb);
2142 
2143 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2144 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2145 			rv = -EBUSY;
2146 			goto out;
2147 		}
2148 
2149 		switch (lkb->lkb_wait_type) {
2150 		case DLM_MSG_LOOKUP:
2151 		case DLM_MSG_REQUEST:
2152 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2153 			rv = -EBUSY;
2154 			goto out;
2155 		case DLM_MSG_UNLOCK:
2156 		case DLM_MSG_CANCEL:
2157 			goto out;
2158 		}
2159 		/* add_to_waiters() will set OVERLAP_CANCEL */
2160 		goto out_ok;
2161 	}
2162 
2163 	/* do we need to allow a force-unlock if there's a normal unlock
2164 	   already in progress?  in what conditions could the normal unlock
2165 	   fail such that we'd want to send a force-unlock to be sure? */
2166 
2167 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2168 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2169 			goto out;
2170 
2171 		if (is_overlap_unlock(lkb))
2172 			goto out;
2173 
2174 		/* don't let scand try to do a cancel */
2175 		del_timeout(lkb);
2176 
2177 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2178 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2179 			rv = -EBUSY;
2180 			goto out;
2181 		}
2182 
2183 		switch (lkb->lkb_wait_type) {
2184 		case DLM_MSG_LOOKUP:
2185 		case DLM_MSG_REQUEST:
2186 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2187 			rv = -EBUSY;
2188 			goto out;
2189 		case DLM_MSG_UNLOCK:
2190 			goto out;
2191 		}
2192 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2193 		goto out_ok;
2194 	}
2195 
2196 	/* normal unlock not allowed if there's any op in progress */
2197 	rv = -EBUSY;
2198 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2199 		goto out;
2200 
2201  out_ok:
2202 	/* an overlapping op shouldn't blow away exflags from other op */
2203 	lkb->lkb_exflags |= args->flags;
2204 	lkb->lkb_sbflags = 0;
2205 	lkb->lkb_astparam = args->astparam;
2206 	rv = 0;
2207  out:
2208 	if (rv)
2209 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2210 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2211 			  args->flags, lkb->lkb_wait_type,
2212 			  lkb->lkb_resource->res_name);
2213 	return rv;
2214 }
2215 
2216 /*
2217  * Four stage 4 varieties:
2218  * do_request(), do_convert(), do_unlock(), do_cancel()
2219  * These are called on the master node for the given lock and
2220  * from the central locking logic.
2221  */
2222 
2223 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2224 {
2225 	int error = 0;
2226 
2227 	if (can_be_granted(r, lkb, 1, NULL)) {
2228 		grant_lock(r, lkb);
2229 		queue_cast(r, lkb, 0);
2230 		goto out;
2231 	}
2232 
2233 	if (can_be_queued(lkb)) {
2234 		error = -EINPROGRESS;
2235 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2236 		send_blocking_asts(r, lkb);
2237 		add_timeout(lkb);
2238 		goto out;
2239 	}
2240 
2241 	error = -EAGAIN;
2242 	if (force_blocking_asts(lkb))
2243 		send_blocking_asts_all(r, lkb);
2244 	queue_cast(r, lkb, -EAGAIN);
2245 
2246  out:
2247 	return error;
2248 }
2249 
2250 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2251 {
2252 	int error = 0;
2253 	int deadlk = 0;
2254 
2255 	/* changing an existing lock may allow others to be granted */
2256 
2257 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2258 		grant_lock(r, lkb);
2259 		queue_cast(r, lkb, 0);
2260 		grant_pending_locks(r);
2261 		goto out;
2262 	}
2263 
2264 	/* can_be_granted() detected that this lock would block in a conversion
2265 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2266 	   the ast for the convert. */
2267 
2268 	if (deadlk) {
2269 		/* it's left on the granted queue */
2270 		log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2271 			  lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2272 			  lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2273 		revert_lock(r, lkb);
2274 		queue_cast(r, lkb, -EDEADLK);
2275 		error = -EDEADLK;
2276 		goto out;
2277 	}
2278 
2279 	/* is_demoted() means the can_be_granted() above set the grmode
2280 	   to NL, and left us on the granted queue.  This auto-demotion
2281 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2282 	   now grantable.  We have to try to grant other converting locks
2283 	   before we try again to grant this one. */
2284 
2285 	if (is_demoted(lkb)) {
2286 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2287 		if (_can_be_granted(r, lkb, 1)) {
2288 			grant_lock(r, lkb);
2289 			queue_cast(r, lkb, 0);
2290 			grant_pending_locks(r);
2291 			goto out;
2292 		}
2293 		/* else fall through and move to convert queue */
2294 	}
2295 
2296 	if (can_be_queued(lkb)) {
2297 		error = -EINPROGRESS;
2298 		del_lkb(r, lkb);
2299 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2300 		send_blocking_asts(r, lkb);
2301 		add_timeout(lkb);
2302 		goto out;
2303 	}
2304 
2305 	error = -EAGAIN;
2306 	if (force_blocking_asts(lkb))
2307 		send_blocking_asts_all(r, lkb);
2308 	queue_cast(r, lkb, -EAGAIN);
2309 
2310  out:
2311 	return error;
2312 }
2313 
2314 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2315 {
2316 	remove_lock(r, lkb);
2317 	queue_cast(r, lkb, -DLM_EUNLOCK);
2318 	grant_pending_locks(r);
2319 	return -DLM_EUNLOCK;
2320 }
2321 
2322 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2323 
2324 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2325 {
2326 	int error;
2327 
2328 	error = revert_lock(r, lkb);
2329 	if (error) {
2330 		queue_cast(r, lkb, -DLM_ECANCEL);
2331 		grant_pending_locks(r);
2332 		return -DLM_ECANCEL;
2333 	}
2334 	return 0;
2335 }
2336 
2337 /*
2338  * Four stage 3 varieties:
2339  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2340  */
2341 
2342 /* add a new lkb to a possibly new rsb, called by requesting process */
2343 
2344 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2345 {
2346 	int error;
2347 
2348 	/* set_master: sets lkb nodeid from r */
2349 
2350 	error = set_master(r, lkb);
2351 	if (error < 0)
2352 		goto out;
2353 	if (error) {
2354 		error = 0;
2355 		goto out;
2356 	}
2357 
2358 	if (is_remote(r))
2359 		/* receive_request() calls do_request() on remote node */
2360 		error = send_request(r, lkb);
2361 	else
2362 		error = do_request(r, lkb);
2363  out:
2364 	return error;
2365 }
2366 
2367 /* change some property of an existing lkb, e.g. mode */
2368 
2369 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2370 {
2371 	int error;
2372 
2373 	if (is_remote(r))
2374 		/* receive_convert() calls do_convert() on remote node */
2375 		error = send_convert(r, lkb);
2376 	else
2377 		error = do_convert(r, lkb);
2378 
2379 	return error;
2380 }
2381 
2382 /* remove an existing lkb from the granted queue */
2383 
2384 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2385 {
2386 	int error;
2387 
2388 	if (is_remote(r))
2389 		/* receive_unlock() calls do_unlock() on remote node */
2390 		error = send_unlock(r, lkb);
2391 	else
2392 		error = do_unlock(r, lkb);
2393 
2394 	return error;
2395 }
2396 
2397 /* remove an existing lkb from the convert or wait queue */
2398 
2399 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2400 {
2401 	int error;
2402 
2403 	if (is_remote(r))
2404 		/* receive_cancel() calls do_cancel() on remote node */
2405 		error = send_cancel(r, lkb);
2406 	else
2407 		error = do_cancel(r, lkb);
2408 
2409 	return error;
2410 }
2411 
2412 /*
2413  * Four stage 2 varieties:
2414  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2415  */
2416 
2417 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2418 			int len, struct dlm_args *args)
2419 {
2420 	struct dlm_rsb *r;
2421 	int error;
2422 
2423 	error = validate_lock_args(ls, lkb, args);
2424 	if (error)
2425 		goto out;
2426 
2427 	error = find_rsb(ls, name, len, R_CREATE, &r);
2428 	if (error)
2429 		goto out;
2430 
2431 	lock_rsb(r);
2432 
2433 	attach_lkb(r, lkb);
2434 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2435 
2436 	error = _request_lock(r, lkb);
2437 
2438 	unlock_rsb(r);
2439 	put_rsb(r);
2440 
2441  out:
2442 	return error;
2443 }
2444 
2445 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2446 			struct dlm_args *args)
2447 {
2448 	struct dlm_rsb *r;
2449 	int error;
2450 
2451 	r = lkb->lkb_resource;
2452 
2453 	hold_rsb(r);
2454 	lock_rsb(r);
2455 
2456 	error = validate_lock_args(ls, lkb, args);
2457 	if (error)
2458 		goto out;
2459 
2460 	error = _convert_lock(r, lkb);
2461  out:
2462 	unlock_rsb(r);
2463 	put_rsb(r);
2464 	return error;
2465 }
2466 
2467 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2468 		       struct dlm_args *args)
2469 {
2470 	struct dlm_rsb *r;
2471 	int error;
2472 
2473 	r = lkb->lkb_resource;
2474 
2475 	hold_rsb(r);
2476 	lock_rsb(r);
2477 
2478 	error = validate_unlock_args(lkb, args);
2479 	if (error)
2480 		goto out;
2481 
2482 	error = _unlock_lock(r, lkb);
2483  out:
2484 	unlock_rsb(r);
2485 	put_rsb(r);
2486 	return error;
2487 }
2488 
2489 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2490 		       struct dlm_args *args)
2491 {
2492 	struct dlm_rsb *r;
2493 	int error;
2494 
2495 	r = lkb->lkb_resource;
2496 
2497 	hold_rsb(r);
2498 	lock_rsb(r);
2499 
2500 	error = validate_unlock_args(lkb, args);
2501 	if (error)
2502 		goto out;
2503 
2504 	error = _cancel_lock(r, lkb);
2505  out:
2506 	unlock_rsb(r);
2507 	put_rsb(r);
2508 	return error;
2509 }
2510 
2511 /*
2512  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2513  */
2514 
2515 int dlm_lock(dlm_lockspace_t *lockspace,
2516 	     int mode,
2517 	     struct dlm_lksb *lksb,
2518 	     uint32_t flags,
2519 	     void *name,
2520 	     unsigned int namelen,
2521 	     uint32_t parent_lkid,
2522 	     void (*ast) (void *astarg),
2523 	     void *astarg,
2524 	     void (*bast) (void *astarg, int mode))
2525 {
2526 	struct dlm_ls *ls;
2527 	struct dlm_lkb *lkb;
2528 	struct dlm_args args;
2529 	int error, convert = flags & DLM_LKF_CONVERT;
2530 
2531 	ls = dlm_find_lockspace_local(lockspace);
2532 	if (!ls)
2533 		return -EINVAL;
2534 
2535 	dlm_lock_recovery(ls);
2536 
2537 	if (convert)
2538 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2539 	else
2540 		error = create_lkb(ls, &lkb);
2541 
2542 	if (error)
2543 		goto out;
2544 
2545 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2546 			      astarg, bast, &args);
2547 	if (error)
2548 		goto out_put;
2549 
2550 	if (convert)
2551 		error = convert_lock(ls, lkb, &args);
2552 	else
2553 		error = request_lock(ls, lkb, name, namelen, &args);
2554 
2555 	if (error == -EINPROGRESS)
2556 		error = 0;
2557  out_put:
2558 	if (convert || error)
2559 		__put_lkb(ls, lkb);
2560 	if (error == -EAGAIN || error == -EDEADLK)
2561 		error = 0;
2562  out:
2563 	dlm_unlock_recovery(ls);
2564 	dlm_put_lockspace(ls);
2565 	return error;
2566 }
2567 
2568 int dlm_unlock(dlm_lockspace_t *lockspace,
2569 	       uint32_t lkid,
2570 	       uint32_t flags,
2571 	       struct dlm_lksb *lksb,
2572 	       void *astarg)
2573 {
2574 	struct dlm_ls *ls;
2575 	struct dlm_lkb *lkb;
2576 	struct dlm_args args;
2577 	int error;
2578 
2579 	ls = dlm_find_lockspace_local(lockspace);
2580 	if (!ls)
2581 		return -EINVAL;
2582 
2583 	dlm_lock_recovery(ls);
2584 
2585 	error = find_lkb(ls, lkid, &lkb);
2586 	if (error)
2587 		goto out;
2588 
2589 	error = set_unlock_args(flags, astarg, &args);
2590 	if (error)
2591 		goto out_put;
2592 
2593 	if (flags & DLM_LKF_CANCEL)
2594 		error = cancel_lock(ls, lkb, &args);
2595 	else
2596 		error = unlock_lock(ls, lkb, &args);
2597 
2598 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2599 		error = 0;
2600 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2601 		error = 0;
2602  out_put:
2603 	dlm_put_lkb(lkb);
2604  out:
2605 	dlm_unlock_recovery(ls);
2606 	dlm_put_lockspace(ls);
2607 	return error;
2608 }
2609 
2610 /*
2611  * send/receive routines for remote operations and replies
2612  *
2613  * send_args
2614  * send_common
2615  * send_request			receive_request
2616  * send_convert			receive_convert
2617  * send_unlock			receive_unlock
2618  * send_cancel			receive_cancel
2619  * send_grant			receive_grant
2620  * send_bast			receive_bast
2621  * send_lookup			receive_lookup
2622  * send_remove			receive_remove
2623  *
2624  * 				send_common_reply
2625  * receive_request_reply	send_request_reply
2626  * receive_convert_reply	send_convert_reply
2627  * receive_unlock_reply		send_unlock_reply
2628  * receive_cancel_reply		send_cancel_reply
2629  * receive_lookup_reply		send_lookup_reply
2630  */
2631 
2632 static int _create_message(struct dlm_ls *ls, int mb_len,
2633 			   int to_nodeid, int mstype,
2634 			   struct dlm_message **ms_ret,
2635 			   struct dlm_mhandle **mh_ret)
2636 {
2637 	struct dlm_message *ms;
2638 	struct dlm_mhandle *mh;
2639 	char *mb;
2640 
2641 	/* get_buffer gives us a message handle (mh) that we need to
2642 	   pass into lowcomms_commit and a message buffer (mb) that we
2643 	   write our data into */
2644 
2645 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, ls->ls_allocation, &mb);
2646 	if (!mh)
2647 		return -ENOBUFS;
2648 
2649 	memset(mb, 0, mb_len);
2650 
2651 	ms = (struct dlm_message *) mb;
2652 
2653 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2654 	ms->m_header.h_lockspace = ls->ls_global_id;
2655 	ms->m_header.h_nodeid = dlm_our_nodeid();
2656 	ms->m_header.h_length = mb_len;
2657 	ms->m_header.h_cmd = DLM_MSG;
2658 
2659 	ms->m_type = mstype;
2660 
2661 	*mh_ret = mh;
2662 	*ms_ret = ms;
2663 	return 0;
2664 }
2665 
2666 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2667 			  int to_nodeid, int mstype,
2668 			  struct dlm_message **ms_ret,
2669 			  struct dlm_mhandle **mh_ret)
2670 {
2671 	int mb_len = sizeof(struct dlm_message);
2672 
2673 	switch (mstype) {
2674 	case DLM_MSG_REQUEST:
2675 	case DLM_MSG_LOOKUP:
2676 	case DLM_MSG_REMOVE:
2677 		mb_len += r->res_length;
2678 		break;
2679 	case DLM_MSG_CONVERT:
2680 	case DLM_MSG_UNLOCK:
2681 	case DLM_MSG_REQUEST_REPLY:
2682 	case DLM_MSG_CONVERT_REPLY:
2683 	case DLM_MSG_GRANT:
2684 		if (lkb && lkb->lkb_lvbptr)
2685 			mb_len += r->res_ls->ls_lvblen;
2686 		break;
2687 	}
2688 
2689 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2690 			       ms_ret, mh_ret);
2691 }
2692 
2693 /* further lowcomms enhancements or alternate implementations may make
2694    the return value from this function useful at some point */
2695 
2696 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2697 {
2698 	dlm_message_out(ms);
2699 	dlm_lowcomms_commit_buffer(mh);
2700 	return 0;
2701 }
2702 
2703 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2704 		      struct dlm_message *ms)
2705 {
2706 	ms->m_nodeid   = lkb->lkb_nodeid;
2707 	ms->m_pid      = lkb->lkb_ownpid;
2708 	ms->m_lkid     = lkb->lkb_id;
2709 	ms->m_remid    = lkb->lkb_remid;
2710 	ms->m_exflags  = lkb->lkb_exflags;
2711 	ms->m_sbflags  = lkb->lkb_sbflags;
2712 	ms->m_flags    = lkb->lkb_flags;
2713 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2714 	ms->m_status   = lkb->lkb_status;
2715 	ms->m_grmode   = lkb->lkb_grmode;
2716 	ms->m_rqmode   = lkb->lkb_rqmode;
2717 	ms->m_hash     = r->res_hash;
2718 
2719 	/* m_result and m_bastmode are set from function args,
2720 	   not from lkb fields */
2721 
2722 	if (lkb->lkb_bastfn)
2723 		ms->m_asts |= AST_BAST;
2724 	if (lkb->lkb_astfn)
2725 		ms->m_asts |= AST_COMP;
2726 
2727 	/* compare with switch in create_message; send_remove() doesn't
2728 	   use send_args() */
2729 
2730 	switch (ms->m_type) {
2731 	case DLM_MSG_REQUEST:
2732 	case DLM_MSG_LOOKUP:
2733 		memcpy(ms->m_extra, r->res_name, r->res_length);
2734 		break;
2735 	case DLM_MSG_CONVERT:
2736 	case DLM_MSG_UNLOCK:
2737 	case DLM_MSG_REQUEST_REPLY:
2738 	case DLM_MSG_CONVERT_REPLY:
2739 	case DLM_MSG_GRANT:
2740 		if (!lkb->lkb_lvbptr)
2741 			break;
2742 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2743 		break;
2744 	}
2745 }
2746 
2747 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2748 {
2749 	struct dlm_message *ms;
2750 	struct dlm_mhandle *mh;
2751 	int to_nodeid, error;
2752 
2753 	error = add_to_waiters(lkb, mstype);
2754 	if (error)
2755 		return error;
2756 
2757 	to_nodeid = r->res_nodeid;
2758 
2759 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2760 	if (error)
2761 		goto fail;
2762 
2763 	send_args(r, lkb, ms);
2764 
2765 	error = send_message(mh, ms);
2766 	if (error)
2767 		goto fail;
2768 	return 0;
2769 
2770  fail:
2771 	remove_from_waiters(lkb, msg_reply_type(mstype));
2772 	return error;
2773 }
2774 
2775 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2776 {
2777 	return send_common(r, lkb, DLM_MSG_REQUEST);
2778 }
2779 
2780 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2781 {
2782 	int error;
2783 
2784 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2785 
2786 	/* down conversions go without a reply from the master */
2787 	if (!error && down_conversion(lkb)) {
2788 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2789 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2790 		r->res_ls->ls_stub_ms.m_result = 0;
2791 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2792 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2793 	}
2794 
2795 	return error;
2796 }
2797 
2798 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2799    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2800    that the master is still correct. */
2801 
2802 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2803 {
2804 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2805 }
2806 
2807 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2808 {
2809 	return send_common(r, lkb, DLM_MSG_CANCEL);
2810 }
2811 
2812 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2813 {
2814 	struct dlm_message *ms;
2815 	struct dlm_mhandle *mh;
2816 	int to_nodeid, error;
2817 
2818 	to_nodeid = lkb->lkb_nodeid;
2819 
2820 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2821 	if (error)
2822 		goto out;
2823 
2824 	send_args(r, lkb, ms);
2825 
2826 	ms->m_result = 0;
2827 
2828 	error = send_message(mh, ms);
2829  out:
2830 	return error;
2831 }
2832 
2833 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2834 {
2835 	struct dlm_message *ms;
2836 	struct dlm_mhandle *mh;
2837 	int to_nodeid, error;
2838 
2839 	to_nodeid = lkb->lkb_nodeid;
2840 
2841 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2842 	if (error)
2843 		goto out;
2844 
2845 	send_args(r, lkb, ms);
2846 
2847 	ms->m_bastmode = mode;
2848 
2849 	error = send_message(mh, ms);
2850  out:
2851 	return error;
2852 }
2853 
2854 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2855 {
2856 	struct dlm_message *ms;
2857 	struct dlm_mhandle *mh;
2858 	int to_nodeid, error;
2859 
2860 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2861 	if (error)
2862 		return error;
2863 
2864 	to_nodeid = dlm_dir_nodeid(r);
2865 
2866 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2867 	if (error)
2868 		goto fail;
2869 
2870 	send_args(r, lkb, ms);
2871 
2872 	error = send_message(mh, ms);
2873 	if (error)
2874 		goto fail;
2875 	return 0;
2876 
2877  fail:
2878 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
2879 	return error;
2880 }
2881 
2882 static int send_remove(struct dlm_rsb *r)
2883 {
2884 	struct dlm_message *ms;
2885 	struct dlm_mhandle *mh;
2886 	int to_nodeid, error;
2887 
2888 	to_nodeid = dlm_dir_nodeid(r);
2889 
2890 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2891 	if (error)
2892 		goto out;
2893 
2894 	memcpy(ms->m_extra, r->res_name, r->res_length);
2895 	ms->m_hash = r->res_hash;
2896 
2897 	error = send_message(mh, ms);
2898  out:
2899 	return error;
2900 }
2901 
2902 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2903 			     int mstype, int rv)
2904 {
2905 	struct dlm_message *ms;
2906 	struct dlm_mhandle *mh;
2907 	int to_nodeid, error;
2908 
2909 	to_nodeid = lkb->lkb_nodeid;
2910 
2911 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2912 	if (error)
2913 		goto out;
2914 
2915 	send_args(r, lkb, ms);
2916 
2917 	ms->m_result = rv;
2918 
2919 	error = send_message(mh, ms);
2920  out:
2921 	return error;
2922 }
2923 
2924 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2925 {
2926 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2927 }
2928 
2929 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2930 {
2931 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2932 }
2933 
2934 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2935 {
2936 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2937 }
2938 
2939 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2940 {
2941 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2942 }
2943 
2944 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2945 			     int ret_nodeid, int rv)
2946 {
2947 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2948 	struct dlm_message *ms;
2949 	struct dlm_mhandle *mh;
2950 	int error, nodeid = ms_in->m_header.h_nodeid;
2951 
2952 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2953 	if (error)
2954 		goto out;
2955 
2956 	ms->m_lkid = ms_in->m_lkid;
2957 	ms->m_result = rv;
2958 	ms->m_nodeid = ret_nodeid;
2959 
2960 	error = send_message(mh, ms);
2961  out:
2962 	return error;
2963 }
2964 
2965 /* which args we save from a received message depends heavily on the type
2966    of message, unlike the send side where we can safely send everything about
2967    the lkb for any type of message */
2968 
2969 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2970 {
2971 	lkb->lkb_exflags = ms->m_exflags;
2972 	lkb->lkb_sbflags = ms->m_sbflags;
2973 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2974 		         (ms->m_flags & 0x0000FFFF);
2975 }
2976 
2977 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2978 {
2979 	lkb->lkb_sbflags = ms->m_sbflags;
2980 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2981 		         (ms->m_flags & 0x0000FFFF);
2982 }
2983 
2984 static int receive_extralen(struct dlm_message *ms)
2985 {
2986 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2987 }
2988 
2989 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2990 		       struct dlm_message *ms)
2991 {
2992 	int len;
2993 
2994 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2995 		if (!lkb->lkb_lvbptr)
2996 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
2997 		if (!lkb->lkb_lvbptr)
2998 			return -ENOMEM;
2999 		len = receive_extralen(ms);
3000 		if (len > DLM_RESNAME_MAXLEN)
3001 			len = DLM_RESNAME_MAXLEN;
3002 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3003 	}
3004 	return 0;
3005 }
3006 
3007 static void fake_bastfn(void *astparam, int mode)
3008 {
3009 	log_print("fake_bastfn should not be called");
3010 }
3011 
3012 static void fake_astfn(void *astparam)
3013 {
3014 	log_print("fake_astfn should not be called");
3015 }
3016 
3017 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3018 				struct dlm_message *ms)
3019 {
3020 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3021 	lkb->lkb_ownpid = ms->m_pid;
3022 	lkb->lkb_remid = ms->m_lkid;
3023 	lkb->lkb_grmode = DLM_LOCK_IV;
3024 	lkb->lkb_rqmode = ms->m_rqmode;
3025 
3026 	lkb->lkb_bastfn = (ms->m_asts & AST_BAST) ? &fake_bastfn : NULL;
3027 	lkb->lkb_astfn = (ms->m_asts & AST_COMP) ? &fake_astfn : NULL;
3028 
3029 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3030 		/* lkb was just created so there won't be an lvb yet */
3031 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3032 		if (!lkb->lkb_lvbptr)
3033 			return -ENOMEM;
3034 	}
3035 
3036 	return 0;
3037 }
3038 
3039 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3040 				struct dlm_message *ms)
3041 {
3042 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3043 		return -EBUSY;
3044 
3045 	if (receive_lvb(ls, lkb, ms))
3046 		return -ENOMEM;
3047 
3048 	lkb->lkb_rqmode = ms->m_rqmode;
3049 	lkb->lkb_lvbseq = ms->m_lvbseq;
3050 
3051 	return 0;
3052 }
3053 
3054 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3055 			       struct dlm_message *ms)
3056 {
3057 	if (receive_lvb(ls, lkb, ms))
3058 		return -ENOMEM;
3059 	return 0;
3060 }
3061 
3062 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3063    uses to send a reply and that the remote end uses to process the reply. */
3064 
3065 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3066 {
3067 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3068 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3069 	lkb->lkb_remid = ms->m_lkid;
3070 }
3071 
3072 /* This is called after the rsb is locked so that we can safely inspect
3073    fields in the lkb. */
3074 
3075 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3076 {
3077 	int from = ms->m_header.h_nodeid;
3078 	int error = 0;
3079 
3080 	switch (ms->m_type) {
3081 	case DLM_MSG_CONVERT:
3082 	case DLM_MSG_UNLOCK:
3083 	case DLM_MSG_CANCEL:
3084 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3085 			error = -EINVAL;
3086 		break;
3087 
3088 	case DLM_MSG_CONVERT_REPLY:
3089 	case DLM_MSG_UNLOCK_REPLY:
3090 	case DLM_MSG_CANCEL_REPLY:
3091 	case DLM_MSG_GRANT:
3092 	case DLM_MSG_BAST:
3093 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3094 			error = -EINVAL;
3095 		break;
3096 
3097 	case DLM_MSG_REQUEST_REPLY:
3098 		if (!is_process_copy(lkb))
3099 			error = -EINVAL;
3100 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3101 			error = -EINVAL;
3102 		break;
3103 
3104 	default:
3105 		error = -EINVAL;
3106 	}
3107 
3108 	if (error)
3109 		log_error(lkb->lkb_resource->res_ls,
3110 			  "ignore invalid message %d from %d %x %x %x %d",
3111 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3112 			  lkb->lkb_flags, lkb->lkb_nodeid);
3113 	return error;
3114 }
3115 
3116 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3117 {
3118 	struct dlm_lkb *lkb;
3119 	struct dlm_rsb *r;
3120 	int error, namelen;
3121 
3122 	error = create_lkb(ls, &lkb);
3123 	if (error)
3124 		goto fail;
3125 
3126 	receive_flags(lkb, ms);
3127 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3128 	error = receive_request_args(ls, lkb, ms);
3129 	if (error) {
3130 		__put_lkb(ls, lkb);
3131 		goto fail;
3132 	}
3133 
3134 	namelen = receive_extralen(ms);
3135 
3136 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3137 	if (error) {
3138 		__put_lkb(ls, lkb);
3139 		goto fail;
3140 	}
3141 
3142 	lock_rsb(r);
3143 
3144 	attach_lkb(r, lkb);
3145 	error = do_request(r, lkb);
3146 	send_request_reply(r, lkb, error);
3147 
3148 	unlock_rsb(r);
3149 	put_rsb(r);
3150 
3151 	if (error == -EINPROGRESS)
3152 		error = 0;
3153 	if (error)
3154 		dlm_put_lkb(lkb);
3155 	return;
3156 
3157  fail:
3158 	setup_stub_lkb(ls, ms);
3159 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3160 }
3161 
3162 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3163 {
3164 	struct dlm_lkb *lkb;
3165 	struct dlm_rsb *r;
3166 	int error, reply = 1;
3167 
3168 	error = find_lkb(ls, ms->m_remid, &lkb);
3169 	if (error)
3170 		goto fail;
3171 
3172 	r = lkb->lkb_resource;
3173 
3174 	hold_rsb(r);
3175 	lock_rsb(r);
3176 
3177 	error = validate_message(lkb, ms);
3178 	if (error)
3179 		goto out;
3180 
3181 	receive_flags(lkb, ms);
3182 	error = receive_convert_args(ls, lkb, ms);
3183 	if (error)
3184 		goto out_reply;
3185 	reply = !down_conversion(lkb);
3186 
3187 	error = do_convert(r, lkb);
3188  out_reply:
3189 	if (reply)
3190 		send_convert_reply(r, lkb, error);
3191  out:
3192 	unlock_rsb(r);
3193 	put_rsb(r);
3194 	dlm_put_lkb(lkb);
3195 	return;
3196 
3197  fail:
3198 	setup_stub_lkb(ls, ms);
3199 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3200 }
3201 
3202 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3203 {
3204 	struct dlm_lkb *lkb;
3205 	struct dlm_rsb *r;
3206 	int error;
3207 
3208 	error = find_lkb(ls, ms->m_remid, &lkb);
3209 	if (error)
3210 		goto fail;
3211 
3212 	r = lkb->lkb_resource;
3213 
3214 	hold_rsb(r);
3215 	lock_rsb(r);
3216 
3217 	error = validate_message(lkb, ms);
3218 	if (error)
3219 		goto out;
3220 
3221 	receive_flags(lkb, ms);
3222 	error = receive_unlock_args(ls, lkb, ms);
3223 	if (error)
3224 		goto out_reply;
3225 
3226 	error = do_unlock(r, lkb);
3227  out_reply:
3228 	send_unlock_reply(r, lkb, error);
3229  out:
3230 	unlock_rsb(r);
3231 	put_rsb(r);
3232 	dlm_put_lkb(lkb);
3233 	return;
3234 
3235  fail:
3236 	setup_stub_lkb(ls, ms);
3237 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3238 }
3239 
3240 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3241 {
3242 	struct dlm_lkb *lkb;
3243 	struct dlm_rsb *r;
3244 	int error;
3245 
3246 	error = find_lkb(ls, ms->m_remid, &lkb);
3247 	if (error)
3248 		goto fail;
3249 
3250 	receive_flags(lkb, ms);
3251 
3252 	r = lkb->lkb_resource;
3253 
3254 	hold_rsb(r);
3255 	lock_rsb(r);
3256 
3257 	error = validate_message(lkb, ms);
3258 	if (error)
3259 		goto out;
3260 
3261 	error = do_cancel(r, lkb);
3262 	send_cancel_reply(r, lkb, error);
3263  out:
3264 	unlock_rsb(r);
3265 	put_rsb(r);
3266 	dlm_put_lkb(lkb);
3267 	return;
3268 
3269  fail:
3270 	setup_stub_lkb(ls, ms);
3271 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3272 }
3273 
3274 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3275 {
3276 	struct dlm_lkb *lkb;
3277 	struct dlm_rsb *r;
3278 	int error;
3279 
3280 	error = find_lkb(ls, ms->m_remid, &lkb);
3281 	if (error) {
3282 		log_debug(ls, "receive_grant from %d no lkb %x",
3283 			  ms->m_header.h_nodeid, ms->m_remid);
3284 		return;
3285 	}
3286 
3287 	r = lkb->lkb_resource;
3288 
3289 	hold_rsb(r);
3290 	lock_rsb(r);
3291 
3292 	error = validate_message(lkb, ms);
3293 	if (error)
3294 		goto out;
3295 
3296 	receive_flags_reply(lkb, ms);
3297 	if (is_altmode(lkb))
3298 		munge_altmode(lkb, ms);
3299 	grant_lock_pc(r, lkb, ms);
3300 	queue_cast(r, lkb, 0);
3301  out:
3302 	unlock_rsb(r);
3303 	put_rsb(r);
3304 	dlm_put_lkb(lkb);
3305 }
3306 
3307 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3308 {
3309 	struct dlm_lkb *lkb;
3310 	struct dlm_rsb *r;
3311 	int error;
3312 
3313 	error = find_lkb(ls, ms->m_remid, &lkb);
3314 	if (error) {
3315 		log_debug(ls, "receive_bast from %d no lkb %x",
3316 			  ms->m_header.h_nodeid, ms->m_remid);
3317 		return;
3318 	}
3319 
3320 	r = lkb->lkb_resource;
3321 
3322 	hold_rsb(r);
3323 	lock_rsb(r);
3324 
3325 	error = validate_message(lkb, ms);
3326 	if (error)
3327 		goto out;
3328 
3329 	queue_bast(r, lkb, ms->m_bastmode);
3330  out:
3331 	unlock_rsb(r);
3332 	put_rsb(r);
3333 	dlm_put_lkb(lkb);
3334 }
3335 
3336 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3337 {
3338 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3339 
3340 	from_nodeid = ms->m_header.h_nodeid;
3341 	our_nodeid = dlm_our_nodeid();
3342 
3343 	len = receive_extralen(ms);
3344 
3345 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3346 	if (dir_nodeid != our_nodeid) {
3347 		log_error(ls, "lookup dir_nodeid %d from %d",
3348 			  dir_nodeid, from_nodeid);
3349 		error = -EINVAL;
3350 		ret_nodeid = -1;
3351 		goto out;
3352 	}
3353 
3354 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3355 
3356 	/* Optimization: we're master so treat lookup as a request */
3357 	if (!error && ret_nodeid == our_nodeid) {
3358 		receive_request(ls, ms);
3359 		return;
3360 	}
3361  out:
3362 	send_lookup_reply(ls, ms, ret_nodeid, error);
3363 }
3364 
3365 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3366 {
3367 	int len, dir_nodeid, from_nodeid;
3368 
3369 	from_nodeid = ms->m_header.h_nodeid;
3370 
3371 	len = receive_extralen(ms);
3372 
3373 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3374 	if (dir_nodeid != dlm_our_nodeid()) {
3375 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3376 			  dir_nodeid, from_nodeid);
3377 		return;
3378 	}
3379 
3380 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3381 }
3382 
3383 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3384 {
3385 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3386 }
3387 
3388 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3389 {
3390 	struct dlm_lkb *lkb;
3391 	struct dlm_rsb *r;
3392 	int error, mstype, result;
3393 
3394 	error = find_lkb(ls, ms->m_remid, &lkb);
3395 	if (error) {
3396 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3397 			  ms->m_header.h_nodeid, ms->m_remid);
3398 		return;
3399 	}
3400 
3401 	r = lkb->lkb_resource;
3402 	hold_rsb(r);
3403 	lock_rsb(r);
3404 
3405 	error = validate_message(lkb, ms);
3406 	if (error)
3407 		goto out;
3408 
3409 	mstype = lkb->lkb_wait_type;
3410 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3411 	if (error)
3412 		goto out;
3413 
3414 	/* Optimization: the dir node was also the master, so it took our
3415 	   lookup as a request and sent request reply instead of lookup reply */
3416 	if (mstype == DLM_MSG_LOOKUP) {
3417 		r->res_nodeid = ms->m_header.h_nodeid;
3418 		lkb->lkb_nodeid = r->res_nodeid;
3419 	}
3420 
3421 	/* this is the value returned from do_request() on the master */
3422 	result = ms->m_result;
3423 
3424 	switch (result) {
3425 	case -EAGAIN:
3426 		/* request would block (be queued) on remote master */
3427 		queue_cast(r, lkb, -EAGAIN);
3428 		confirm_master(r, -EAGAIN);
3429 		unhold_lkb(lkb); /* undoes create_lkb() */
3430 		break;
3431 
3432 	case -EINPROGRESS:
3433 	case 0:
3434 		/* request was queued or granted on remote master */
3435 		receive_flags_reply(lkb, ms);
3436 		lkb->lkb_remid = ms->m_lkid;
3437 		if (is_altmode(lkb))
3438 			munge_altmode(lkb, ms);
3439 		if (result) {
3440 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3441 			add_timeout(lkb);
3442 		} else {
3443 			grant_lock_pc(r, lkb, ms);
3444 			queue_cast(r, lkb, 0);
3445 		}
3446 		confirm_master(r, result);
3447 		break;
3448 
3449 	case -EBADR:
3450 	case -ENOTBLK:
3451 		/* find_rsb failed to find rsb or rsb wasn't master */
3452 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3453 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3454 		r->res_nodeid = -1;
3455 		lkb->lkb_nodeid = -1;
3456 
3457 		if (is_overlap(lkb)) {
3458 			/* we'll ignore error in cancel/unlock reply */
3459 			queue_cast_overlap(r, lkb);
3460 			confirm_master(r, result);
3461 			unhold_lkb(lkb); /* undoes create_lkb() */
3462 		} else
3463 			_request_lock(r, lkb);
3464 		break;
3465 
3466 	default:
3467 		log_error(ls, "receive_request_reply %x error %d",
3468 			  lkb->lkb_id, result);
3469 	}
3470 
3471 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3472 		log_debug(ls, "receive_request_reply %x result %d unlock",
3473 			  lkb->lkb_id, result);
3474 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3475 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3476 		send_unlock(r, lkb);
3477 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3478 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3479 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3480 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3481 		send_cancel(r, lkb);
3482 	} else {
3483 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3484 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3485 	}
3486  out:
3487 	unlock_rsb(r);
3488 	put_rsb(r);
3489 	dlm_put_lkb(lkb);
3490 }
3491 
3492 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3493 				    struct dlm_message *ms)
3494 {
3495 	/* this is the value returned from do_convert() on the master */
3496 	switch (ms->m_result) {
3497 	case -EAGAIN:
3498 		/* convert would block (be queued) on remote master */
3499 		queue_cast(r, lkb, -EAGAIN);
3500 		break;
3501 
3502 	case -EDEADLK:
3503 		receive_flags_reply(lkb, ms);
3504 		revert_lock_pc(r, lkb);
3505 		queue_cast(r, lkb, -EDEADLK);
3506 		break;
3507 
3508 	case -EINPROGRESS:
3509 		/* convert was queued on remote master */
3510 		receive_flags_reply(lkb, ms);
3511 		if (is_demoted(lkb))
3512 			munge_demoted(lkb, ms);
3513 		del_lkb(r, lkb);
3514 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3515 		add_timeout(lkb);
3516 		break;
3517 
3518 	case 0:
3519 		/* convert was granted on remote master */
3520 		receive_flags_reply(lkb, ms);
3521 		if (is_demoted(lkb))
3522 			munge_demoted(lkb, ms);
3523 		grant_lock_pc(r, lkb, ms);
3524 		queue_cast(r, lkb, 0);
3525 		break;
3526 
3527 	default:
3528 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3529 			  lkb->lkb_id, ms->m_result);
3530 	}
3531 }
3532 
3533 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3534 {
3535 	struct dlm_rsb *r = lkb->lkb_resource;
3536 	int error;
3537 
3538 	hold_rsb(r);
3539 	lock_rsb(r);
3540 
3541 	error = validate_message(lkb, ms);
3542 	if (error)
3543 		goto out;
3544 
3545 	/* stub reply can happen with waiters_mutex held */
3546 	error = remove_from_waiters_ms(lkb, ms);
3547 	if (error)
3548 		goto out;
3549 
3550 	__receive_convert_reply(r, lkb, ms);
3551  out:
3552 	unlock_rsb(r);
3553 	put_rsb(r);
3554 }
3555 
3556 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3557 {
3558 	struct dlm_lkb *lkb;
3559 	int error;
3560 
3561 	error = find_lkb(ls, ms->m_remid, &lkb);
3562 	if (error) {
3563 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3564 			  ms->m_header.h_nodeid, ms->m_remid);
3565 		return;
3566 	}
3567 
3568 	_receive_convert_reply(lkb, ms);
3569 	dlm_put_lkb(lkb);
3570 }
3571 
3572 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3573 {
3574 	struct dlm_rsb *r = lkb->lkb_resource;
3575 	int error;
3576 
3577 	hold_rsb(r);
3578 	lock_rsb(r);
3579 
3580 	error = validate_message(lkb, ms);
3581 	if (error)
3582 		goto out;
3583 
3584 	/* stub reply can happen with waiters_mutex held */
3585 	error = remove_from_waiters_ms(lkb, ms);
3586 	if (error)
3587 		goto out;
3588 
3589 	/* this is the value returned from do_unlock() on the master */
3590 
3591 	switch (ms->m_result) {
3592 	case -DLM_EUNLOCK:
3593 		receive_flags_reply(lkb, ms);
3594 		remove_lock_pc(r, lkb);
3595 		queue_cast(r, lkb, -DLM_EUNLOCK);
3596 		break;
3597 	case -ENOENT:
3598 		break;
3599 	default:
3600 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3601 			  lkb->lkb_id, ms->m_result);
3602 	}
3603  out:
3604 	unlock_rsb(r);
3605 	put_rsb(r);
3606 }
3607 
3608 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3609 {
3610 	struct dlm_lkb *lkb;
3611 	int error;
3612 
3613 	error = find_lkb(ls, ms->m_remid, &lkb);
3614 	if (error) {
3615 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3616 			  ms->m_header.h_nodeid, ms->m_remid);
3617 		return;
3618 	}
3619 
3620 	_receive_unlock_reply(lkb, ms);
3621 	dlm_put_lkb(lkb);
3622 }
3623 
3624 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3625 {
3626 	struct dlm_rsb *r = lkb->lkb_resource;
3627 	int error;
3628 
3629 	hold_rsb(r);
3630 	lock_rsb(r);
3631 
3632 	error = validate_message(lkb, ms);
3633 	if (error)
3634 		goto out;
3635 
3636 	/* stub reply can happen with waiters_mutex held */
3637 	error = remove_from_waiters_ms(lkb, ms);
3638 	if (error)
3639 		goto out;
3640 
3641 	/* this is the value returned from do_cancel() on the master */
3642 
3643 	switch (ms->m_result) {
3644 	case -DLM_ECANCEL:
3645 		receive_flags_reply(lkb, ms);
3646 		revert_lock_pc(r, lkb);
3647 		queue_cast(r, lkb, -DLM_ECANCEL);
3648 		break;
3649 	case 0:
3650 		break;
3651 	default:
3652 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3653 			  lkb->lkb_id, ms->m_result);
3654 	}
3655  out:
3656 	unlock_rsb(r);
3657 	put_rsb(r);
3658 }
3659 
3660 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3661 {
3662 	struct dlm_lkb *lkb;
3663 	int error;
3664 
3665 	error = find_lkb(ls, ms->m_remid, &lkb);
3666 	if (error) {
3667 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3668 			  ms->m_header.h_nodeid, ms->m_remid);
3669 		return;
3670 	}
3671 
3672 	_receive_cancel_reply(lkb, ms);
3673 	dlm_put_lkb(lkb);
3674 }
3675 
3676 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3677 {
3678 	struct dlm_lkb *lkb;
3679 	struct dlm_rsb *r;
3680 	int error, ret_nodeid;
3681 
3682 	error = find_lkb(ls, ms->m_lkid, &lkb);
3683 	if (error) {
3684 		log_error(ls, "receive_lookup_reply no lkb");
3685 		return;
3686 	}
3687 
3688 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3689 	   FIXME: will a non-zero error ever be returned? */
3690 
3691 	r = lkb->lkb_resource;
3692 	hold_rsb(r);
3693 	lock_rsb(r);
3694 
3695 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3696 	if (error)
3697 		goto out;
3698 
3699 	ret_nodeid = ms->m_nodeid;
3700 	if (ret_nodeid == dlm_our_nodeid()) {
3701 		r->res_nodeid = 0;
3702 		ret_nodeid = 0;
3703 		r->res_first_lkid = 0;
3704 	} else {
3705 		/* set_master() will copy res_nodeid to lkb_nodeid */
3706 		r->res_nodeid = ret_nodeid;
3707 	}
3708 
3709 	if (is_overlap(lkb)) {
3710 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3711 			  lkb->lkb_id, lkb->lkb_flags);
3712 		queue_cast_overlap(r, lkb);
3713 		unhold_lkb(lkb); /* undoes create_lkb() */
3714 		goto out_list;
3715 	}
3716 
3717 	_request_lock(r, lkb);
3718 
3719  out_list:
3720 	if (!ret_nodeid)
3721 		process_lookup_list(r);
3722  out:
3723 	unlock_rsb(r);
3724 	put_rsb(r);
3725 	dlm_put_lkb(lkb);
3726 }
3727 
3728 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3729 {
3730 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3731 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3732 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3733 			  ms->m_remid, ms->m_result);
3734 		return;
3735 	}
3736 
3737 	switch (ms->m_type) {
3738 
3739 	/* messages sent to a master node */
3740 
3741 	case DLM_MSG_REQUEST:
3742 		receive_request(ls, ms);
3743 		break;
3744 
3745 	case DLM_MSG_CONVERT:
3746 		receive_convert(ls, ms);
3747 		break;
3748 
3749 	case DLM_MSG_UNLOCK:
3750 		receive_unlock(ls, ms);
3751 		break;
3752 
3753 	case DLM_MSG_CANCEL:
3754 		receive_cancel(ls, ms);
3755 		break;
3756 
3757 	/* messages sent from a master node (replies to above) */
3758 
3759 	case DLM_MSG_REQUEST_REPLY:
3760 		receive_request_reply(ls, ms);
3761 		break;
3762 
3763 	case DLM_MSG_CONVERT_REPLY:
3764 		receive_convert_reply(ls, ms);
3765 		break;
3766 
3767 	case DLM_MSG_UNLOCK_REPLY:
3768 		receive_unlock_reply(ls, ms);
3769 		break;
3770 
3771 	case DLM_MSG_CANCEL_REPLY:
3772 		receive_cancel_reply(ls, ms);
3773 		break;
3774 
3775 	/* messages sent from a master node (only two types of async msg) */
3776 
3777 	case DLM_MSG_GRANT:
3778 		receive_grant(ls, ms);
3779 		break;
3780 
3781 	case DLM_MSG_BAST:
3782 		receive_bast(ls, ms);
3783 		break;
3784 
3785 	/* messages sent to a dir node */
3786 
3787 	case DLM_MSG_LOOKUP:
3788 		receive_lookup(ls, ms);
3789 		break;
3790 
3791 	case DLM_MSG_REMOVE:
3792 		receive_remove(ls, ms);
3793 		break;
3794 
3795 	/* messages sent from a dir node (remove has no reply) */
3796 
3797 	case DLM_MSG_LOOKUP_REPLY:
3798 		receive_lookup_reply(ls, ms);
3799 		break;
3800 
3801 	/* other messages */
3802 
3803 	case DLM_MSG_PURGE:
3804 		receive_purge(ls, ms);
3805 		break;
3806 
3807 	default:
3808 		log_error(ls, "unknown message type %d", ms->m_type);
3809 	}
3810 
3811 	dlm_astd_wake();
3812 }
3813 
3814 /* If the lockspace is in recovery mode (locking stopped), then normal
3815    messages are saved on the requestqueue for processing after recovery is
3816    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
3817    messages off the requestqueue before we process new ones. This occurs right
3818    after recovery completes when we transition from saving all messages on
3819    requestqueue, to processing all the saved messages, to processing new
3820    messages as they arrive. */
3821 
3822 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
3823 				int nodeid)
3824 {
3825 	if (dlm_locking_stopped(ls)) {
3826 		dlm_add_requestqueue(ls, nodeid, ms);
3827 	} else {
3828 		dlm_wait_requestqueue(ls);
3829 		_receive_message(ls, ms);
3830 	}
3831 }
3832 
3833 /* This is called by dlm_recoverd to process messages that were saved on
3834    the requestqueue. */
3835 
3836 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
3837 {
3838 	_receive_message(ls, ms);
3839 }
3840 
3841 /* This is called by the midcomms layer when something is received for
3842    the lockspace.  It could be either a MSG (normal message sent as part of
3843    standard locking activity) or an RCOM (recovery message sent as part of
3844    lockspace recovery). */
3845 
3846 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
3847 {
3848 	struct dlm_header *hd = &p->header;
3849 	struct dlm_ls *ls;
3850 	int type = 0;
3851 
3852 	switch (hd->h_cmd) {
3853 	case DLM_MSG:
3854 		dlm_message_in(&p->message);
3855 		type = p->message.m_type;
3856 		break;
3857 	case DLM_RCOM:
3858 		dlm_rcom_in(&p->rcom);
3859 		type = p->rcom.rc_type;
3860 		break;
3861 	default:
3862 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
3863 		return;
3864 	}
3865 
3866 	if (hd->h_nodeid != nodeid) {
3867 		log_print("invalid h_nodeid %d from %d lockspace %x",
3868 			  hd->h_nodeid, nodeid, hd->h_lockspace);
3869 		return;
3870 	}
3871 
3872 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3873 	if (!ls) {
3874 		if (dlm_config.ci_log_debug)
3875 			log_print("invalid lockspace %x from %d cmd %d type %d",
3876 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
3877 
3878 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
3879 			dlm_send_ls_not_ready(nodeid, &p->rcom);
3880 		return;
3881 	}
3882 
3883 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
3884 	   be inactive (in this ls) before transitioning to recovery mode */
3885 
3886 	down_read(&ls->ls_recv_active);
3887 	if (hd->h_cmd == DLM_MSG)
3888 		dlm_receive_message(ls, &p->message, nodeid);
3889 	else
3890 		dlm_receive_rcom(ls, &p->rcom, nodeid);
3891 	up_read(&ls->ls_recv_active);
3892 
3893 	dlm_put_lockspace(ls);
3894 }
3895 
3896 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3897 {
3898 	if (middle_conversion(lkb)) {
3899 		hold_lkb(lkb);
3900 		ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
3901 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3902 		ls->ls_stub_ms.m_flags = lkb->lkb_flags;
3903 		ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
3904 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3905 
3906 		/* Same special case as in receive_rcom_lock_args() */
3907 		lkb->lkb_grmode = DLM_LOCK_IV;
3908 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3909 		unhold_lkb(lkb);
3910 
3911 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3912 		lkb->lkb_flags |= DLM_IFL_RESEND;
3913 	}
3914 
3915 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3916 	   conversions are async; there's no reply from the remote master */
3917 }
3918 
3919 /* A waiting lkb needs recovery if the master node has failed, or
3920    the master node is changing (only when no directory is used) */
3921 
3922 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3923 {
3924 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3925 		return 1;
3926 
3927 	if (!dlm_no_directory(ls))
3928 		return 0;
3929 
3930 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3931 		return 1;
3932 
3933 	return 0;
3934 }
3935 
3936 /* Recovery for locks that are waiting for replies from nodes that are now
3937    gone.  We can just complete unlocks and cancels by faking a reply from the
3938    dead node.  Requests and up-conversions we flag to be resent after
3939    recovery.  Down-conversions can just be completed with a fake reply like
3940    unlocks.  Conversions between PR and CW need special attention. */
3941 
3942 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3943 {
3944 	struct dlm_lkb *lkb, *safe;
3945 	int wait_type, stub_unlock_result, stub_cancel_result;
3946 
3947 	mutex_lock(&ls->ls_waiters_mutex);
3948 
3949 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3950 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3951 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3952 
3953 		/* all outstanding lookups, regardless of destination  will be
3954 		   resent after recovery is done */
3955 
3956 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3957 			lkb->lkb_flags |= DLM_IFL_RESEND;
3958 			continue;
3959 		}
3960 
3961 		if (!waiter_needs_recovery(ls, lkb))
3962 			continue;
3963 
3964 		wait_type = lkb->lkb_wait_type;
3965 		stub_unlock_result = -DLM_EUNLOCK;
3966 		stub_cancel_result = -DLM_ECANCEL;
3967 
3968 		/* Main reply may have been received leaving a zero wait_type,
3969 		   but a reply for the overlapping op may not have been
3970 		   received.  In that case we need to fake the appropriate
3971 		   reply for the overlap op. */
3972 
3973 		if (!wait_type) {
3974 			if (is_overlap_cancel(lkb)) {
3975 				wait_type = DLM_MSG_CANCEL;
3976 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3977 					stub_cancel_result = 0;
3978 			}
3979 			if (is_overlap_unlock(lkb)) {
3980 				wait_type = DLM_MSG_UNLOCK;
3981 				if (lkb->lkb_grmode == DLM_LOCK_IV)
3982 					stub_unlock_result = -ENOENT;
3983 			}
3984 
3985 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
3986 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
3987 				  stub_cancel_result, stub_unlock_result);
3988 		}
3989 
3990 		switch (wait_type) {
3991 
3992 		case DLM_MSG_REQUEST:
3993 			lkb->lkb_flags |= DLM_IFL_RESEND;
3994 			break;
3995 
3996 		case DLM_MSG_CONVERT:
3997 			recover_convert_waiter(ls, lkb);
3998 			break;
3999 
4000 		case DLM_MSG_UNLOCK:
4001 			hold_lkb(lkb);
4002 			ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
4003 			ls->ls_stub_ms.m_result = stub_unlock_result;
4004 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4005 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4006 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
4007 			dlm_put_lkb(lkb);
4008 			break;
4009 
4010 		case DLM_MSG_CANCEL:
4011 			hold_lkb(lkb);
4012 			ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
4013 			ls->ls_stub_ms.m_result = stub_cancel_result;
4014 			ls->ls_stub_ms.m_flags = lkb->lkb_flags;
4015 			ls->ls_stub_ms.m_header.h_nodeid = lkb->lkb_nodeid;
4016 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
4017 			dlm_put_lkb(lkb);
4018 			break;
4019 
4020 		default:
4021 			log_error(ls, "invalid lkb wait_type %d %d",
4022 				  lkb->lkb_wait_type, wait_type);
4023 		}
4024 		schedule();
4025 	}
4026 	mutex_unlock(&ls->ls_waiters_mutex);
4027 }
4028 
4029 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4030 {
4031 	struct dlm_lkb *lkb;
4032 	int found = 0;
4033 
4034 	mutex_lock(&ls->ls_waiters_mutex);
4035 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4036 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4037 			hold_lkb(lkb);
4038 			found = 1;
4039 			break;
4040 		}
4041 	}
4042 	mutex_unlock(&ls->ls_waiters_mutex);
4043 
4044 	if (!found)
4045 		lkb = NULL;
4046 	return lkb;
4047 }
4048 
4049 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4050    master or dir-node for r.  Processing the lkb may result in it being placed
4051    back on waiters. */
4052 
4053 /* We do this after normal locking has been enabled and any saved messages
4054    (in requestqueue) have been processed.  We should be confident that at
4055    this point we won't get or process a reply to any of these waiting
4056    operations.  But, new ops may be coming in on the rsbs/locks here from
4057    userspace or remotely. */
4058 
4059 /* there may have been an overlap unlock/cancel prior to recovery or after
4060    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4061    overlap flag would just have been set and nothing new sent.  we can be
4062    confident here than any replies to either the initial op or overlap ops
4063    prior to recovery have been received. */
4064 
4065 int dlm_recover_waiters_post(struct dlm_ls *ls)
4066 {
4067 	struct dlm_lkb *lkb;
4068 	struct dlm_rsb *r;
4069 	int error = 0, mstype, err, oc, ou;
4070 
4071 	while (1) {
4072 		if (dlm_locking_stopped(ls)) {
4073 			log_debug(ls, "recover_waiters_post aborted");
4074 			error = -EINTR;
4075 			break;
4076 		}
4077 
4078 		lkb = find_resend_waiter(ls);
4079 		if (!lkb)
4080 			break;
4081 
4082 		r = lkb->lkb_resource;
4083 		hold_rsb(r);
4084 		lock_rsb(r);
4085 
4086 		mstype = lkb->lkb_wait_type;
4087 		oc = is_overlap_cancel(lkb);
4088 		ou = is_overlap_unlock(lkb);
4089 		err = 0;
4090 
4091 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
4092 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
4093 
4094 		/* At this point we assume that we won't get a reply to any
4095 		   previous op or overlap op on this lock.  First, do a big
4096 		   remove_from_waiters() for all previous ops. */
4097 
4098 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4099 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4100 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4101 		lkb->lkb_wait_type = 0;
4102 		lkb->lkb_wait_count = 0;
4103 		mutex_lock(&ls->ls_waiters_mutex);
4104 		list_del_init(&lkb->lkb_wait_reply);
4105 		mutex_unlock(&ls->ls_waiters_mutex);
4106 		unhold_lkb(lkb); /* for waiters list */
4107 
4108 		if (oc || ou) {
4109 			/* do an unlock or cancel instead of resending */
4110 			switch (mstype) {
4111 			case DLM_MSG_LOOKUP:
4112 			case DLM_MSG_REQUEST:
4113 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4114 							-DLM_ECANCEL);
4115 				unhold_lkb(lkb); /* undoes create_lkb() */
4116 				break;
4117 			case DLM_MSG_CONVERT:
4118 				if (oc) {
4119 					queue_cast(r, lkb, -DLM_ECANCEL);
4120 				} else {
4121 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4122 					_unlock_lock(r, lkb);
4123 				}
4124 				break;
4125 			default:
4126 				err = 1;
4127 			}
4128 		} else {
4129 			switch (mstype) {
4130 			case DLM_MSG_LOOKUP:
4131 			case DLM_MSG_REQUEST:
4132 				_request_lock(r, lkb);
4133 				if (is_master(r))
4134 					confirm_master(r, 0);
4135 				break;
4136 			case DLM_MSG_CONVERT:
4137 				_convert_lock(r, lkb);
4138 				break;
4139 			default:
4140 				err = 1;
4141 			}
4142 		}
4143 
4144 		if (err)
4145 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4146 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4147 		unlock_rsb(r);
4148 		put_rsb(r);
4149 		dlm_put_lkb(lkb);
4150 	}
4151 
4152 	return error;
4153 }
4154 
4155 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4156 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4157 {
4158 	struct dlm_ls *ls = r->res_ls;
4159 	struct dlm_lkb *lkb, *safe;
4160 
4161 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4162 		if (test(ls, lkb)) {
4163 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4164 			del_lkb(r, lkb);
4165 			/* this put should free the lkb */
4166 			if (!dlm_put_lkb(lkb))
4167 				log_error(ls, "purged lkb not released");
4168 		}
4169 	}
4170 }
4171 
4172 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4173 {
4174 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4175 }
4176 
4177 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4178 {
4179 	return is_master_copy(lkb);
4180 }
4181 
4182 static void purge_dead_locks(struct dlm_rsb *r)
4183 {
4184 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4185 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4186 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4187 }
4188 
4189 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4190 {
4191 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4192 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4193 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4194 }
4195 
4196 /* Get rid of locks held by nodes that are gone. */
4197 
4198 int dlm_purge_locks(struct dlm_ls *ls)
4199 {
4200 	struct dlm_rsb *r;
4201 
4202 	log_debug(ls, "dlm_purge_locks");
4203 
4204 	down_write(&ls->ls_root_sem);
4205 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4206 		hold_rsb(r);
4207 		lock_rsb(r);
4208 		if (is_master(r))
4209 			purge_dead_locks(r);
4210 		unlock_rsb(r);
4211 		unhold_rsb(r);
4212 
4213 		schedule();
4214 	}
4215 	up_write(&ls->ls_root_sem);
4216 
4217 	return 0;
4218 }
4219 
4220 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4221 {
4222 	struct dlm_rsb *r, *r_ret = NULL;
4223 
4224 	read_lock(&ls->ls_rsbtbl[bucket].lock);
4225 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4226 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4227 			continue;
4228 		hold_rsb(r);
4229 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4230 		r_ret = r;
4231 		break;
4232 	}
4233 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
4234 	return r_ret;
4235 }
4236 
4237 void dlm_grant_after_purge(struct dlm_ls *ls)
4238 {
4239 	struct dlm_rsb *r;
4240 	int bucket = 0;
4241 
4242 	while (1) {
4243 		r = find_purged_rsb(ls, bucket);
4244 		if (!r) {
4245 			if (bucket == ls->ls_rsbtbl_size - 1)
4246 				break;
4247 			bucket++;
4248 			continue;
4249 		}
4250 		lock_rsb(r);
4251 		if (is_master(r)) {
4252 			grant_pending_locks(r);
4253 			confirm_master(r, 0);
4254 		}
4255 		unlock_rsb(r);
4256 		put_rsb(r);
4257 		schedule();
4258 	}
4259 }
4260 
4261 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4262 					 uint32_t remid)
4263 {
4264 	struct dlm_lkb *lkb;
4265 
4266 	list_for_each_entry(lkb, head, lkb_statequeue) {
4267 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4268 			return lkb;
4269 	}
4270 	return NULL;
4271 }
4272 
4273 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4274 				    uint32_t remid)
4275 {
4276 	struct dlm_lkb *lkb;
4277 
4278 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4279 	if (lkb)
4280 		return lkb;
4281 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4282 	if (lkb)
4283 		return lkb;
4284 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4285 	if (lkb)
4286 		return lkb;
4287 	return NULL;
4288 }
4289 
4290 /* needs at least dlm_rcom + rcom_lock */
4291 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4292 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4293 {
4294 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4295 
4296 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4297 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4298 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4299 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4300 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4301 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4302 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4303 	lkb->lkb_rqmode = rl->rl_rqmode;
4304 	lkb->lkb_grmode = rl->rl_grmode;
4305 	/* don't set lkb_status because add_lkb wants to itself */
4306 
4307 	lkb->lkb_bastfn = (rl->rl_asts & AST_BAST) ? &fake_bastfn : NULL;
4308 	lkb->lkb_astfn = (rl->rl_asts & AST_COMP) ? &fake_astfn : NULL;
4309 
4310 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4311 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4312 			 sizeof(struct rcom_lock);
4313 		if (lvblen > ls->ls_lvblen)
4314 			return -EINVAL;
4315 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4316 		if (!lkb->lkb_lvbptr)
4317 			return -ENOMEM;
4318 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4319 	}
4320 
4321 	/* Conversions between PR and CW (middle modes) need special handling.
4322 	   The real granted mode of these converting locks cannot be determined
4323 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4324 
4325 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4326 	    middle_conversion(lkb)) {
4327 		rl->rl_status = DLM_LKSTS_CONVERT;
4328 		lkb->lkb_grmode = DLM_LOCK_IV;
4329 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4330 	}
4331 
4332 	return 0;
4333 }
4334 
4335 /* This lkb may have been recovered in a previous aborted recovery so we need
4336    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4337    If so we just send back a standard reply.  If not, we create a new lkb with
4338    the given values and send back our lkid.  We send back our lkid by sending
4339    back the rcom_lock struct we got but with the remid field filled in. */
4340 
4341 /* needs at least dlm_rcom + rcom_lock */
4342 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4343 {
4344 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4345 	struct dlm_rsb *r;
4346 	struct dlm_lkb *lkb;
4347 	int error;
4348 
4349 	if (rl->rl_parent_lkid) {
4350 		error = -EOPNOTSUPP;
4351 		goto out;
4352 	}
4353 
4354 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4355 			 R_MASTER, &r);
4356 	if (error)
4357 		goto out;
4358 
4359 	lock_rsb(r);
4360 
4361 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4362 	if (lkb) {
4363 		error = -EEXIST;
4364 		goto out_remid;
4365 	}
4366 
4367 	error = create_lkb(ls, &lkb);
4368 	if (error)
4369 		goto out_unlock;
4370 
4371 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4372 	if (error) {
4373 		__put_lkb(ls, lkb);
4374 		goto out_unlock;
4375 	}
4376 
4377 	attach_lkb(r, lkb);
4378 	add_lkb(r, lkb, rl->rl_status);
4379 	error = 0;
4380 
4381  out_remid:
4382 	/* this is the new value returned to the lock holder for
4383 	   saving in its process-copy lkb */
4384 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4385 
4386  out_unlock:
4387 	unlock_rsb(r);
4388 	put_rsb(r);
4389  out:
4390 	if (error)
4391 		log_debug(ls, "recover_master_copy %d %x", error,
4392 			  le32_to_cpu(rl->rl_lkid));
4393 	rl->rl_result = cpu_to_le32(error);
4394 	return error;
4395 }
4396 
4397 /* needs at least dlm_rcom + rcom_lock */
4398 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4399 {
4400 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4401 	struct dlm_rsb *r;
4402 	struct dlm_lkb *lkb;
4403 	int error;
4404 
4405 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4406 	if (error) {
4407 		log_error(ls, "recover_process_copy no lkid %x",
4408 				le32_to_cpu(rl->rl_lkid));
4409 		return error;
4410 	}
4411 
4412 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4413 
4414 	error = le32_to_cpu(rl->rl_result);
4415 
4416 	r = lkb->lkb_resource;
4417 	hold_rsb(r);
4418 	lock_rsb(r);
4419 
4420 	switch (error) {
4421 	case -EBADR:
4422 		/* There's a chance the new master received our lock before
4423 		   dlm_recover_master_reply(), this wouldn't happen if we did
4424 		   a barrier between recover_masters and recover_locks. */
4425 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4426 			  (unsigned long)r, r->res_name);
4427 		dlm_send_rcom_lock(r, lkb);
4428 		goto out;
4429 	case -EEXIST:
4430 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4431 		/* fall through */
4432 	case 0:
4433 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4434 		break;
4435 	default:
4436 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4437 			  error, lkb->lkb_id);
4438 	}
4439 
4440 	/* an ack for dlm_recover_locks() which waits for replies from
4441 	   all the locks it sends to new masters */
4442 	dlm_recovered_lock(r);
4443  out:
4444 	unlock_rsb(r);
4445 	put_rsb(r);
4446 	dlm_put_lkb(lkb);
4447 
4448 	return 0;
4449 }
4450 
4451 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4452 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4453 		     unsigned long timeout_cs)
4454 {
4455 	struct dlm_lkb *lkb;
4456 	struct dlm_args args;
4457 	int error;
4458 
4459 	dlm_lock_recovery(ls);
4460 
4461 	error = create_lkb(ls, &lkb);
4462 	if (error) {
4463 		kfree(ua);
4464 		goto out;
4465 	}
4466 
4467 	if (flags & DLM_LKF_VALBLK) {
4468 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4469 		if (!ua->lksb.sb_lvbptr) {
4470 			kfree(ua);
4471 			__put_lkb(ls, lkb);
4472 			error = -ENOMEM;
4473 			goto out;
4474 		}
4475 	}
4476 
4477 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4478 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4479 	   lock and that lkb_astparam is the dlm_user_args structure. */
4480 
4481 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4482 			      fake_astfn, ua, fake_bastfn, &args);
4483 	lkb->lkb_flags |= DLM_IFL_USER;
4484 	ua->old_mode = DLM_LOCK_IV;
4485 
4486 	if (error) {
4487 		__put_lkb(ls, lkb);
4488 		goto out;
4489 	}
4490 
4491 	error = request_lock(ls, lkb, name, namelen, &args);
4492 
4493 	switch (error) {
4494 	case 0:
4495 		break;
4496 	case -EINPROGRESS:
4497 		error = 0;
4498 		break;
4499 	case -EAGAIN:
4500 		error = 0;
4501 		/* fall through */
4502 	default:
4503 		__put_lkb(ls, lkb);
4504 		goto out;
4505 	}
4506 
4507 	/* add this new lkb to the per-process list of locks */
4508 	spin_lock(&ua->proc->locks_spin);
4509 	hold_lkb(lkb);
4510 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4511 	spin_unlock(&ua->proc->locks_spin);
4512  out:
4513 	dlm_unlock_recovery(ls);
4514 	return error;
4515 }
4516 
4517 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4518 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4519 		     unsigned long timeout_cs)
4520 {
4521 	struct dlm_lkb *lkb;
4522 	struct dlm_args args;
4523 	struct dlm_user_args *ua;
4524 	int error;
4525 
4526 	dlm_lock_recovery(ls);
4527 
4528 	error = find_lkb(ls, lkid, &lkb);
4529 	if (error)
4530 		goto out;
4531 
4532 	/* user can change the params on its lock when it converts it, or
4533 	   add an lvb that didn't exist before */
4534 
4535 	ua = lkb->lkb_ua;
4536 
4537 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4538 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
4539 		if (!ua->lksb.sb_lvbptr) {
4540 			error = -ENOMEM;
4541 			goto out_put;
4542 		}
4543 	}
4544 	if (lvb_in && ua->lksb.sb_lvbptr)
4545 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4546 
4547 	ua->xid = ua_tmp->xid;
4548 	ua->castparam = ua_tmp->castparam;
4549 	ua->castaddr = ua_tmp->castaddr;
4550 	ua->bastparam = ua_tmp->bastparam;
4551 	ua->bastaddr = ua_tmp->bastaddr;
4552 	ua->user_lksb = ua_tmp->user_lksb;
4553 	ua->old_mode = lkb->lkb_grmode;
4554 
4555 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4556 			      fake_astfn, ua, fake_bastfn, &args);
4557 	if (error)
4558 		goto out_put;
4559 
4560 	error = convert_lock(ls, lkb, &args);
4561 
4562 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4563 		error = 0;
4564  out_put:
4565 	dlm_put_lkb(lkb);
4566  out:
4567 	dlm_unlock_recovery(ls);
4568 	kfree(ua_tmp);
4569 	return error;
4570 }
4571 
4572 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4573 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4574 {
4575 	struct dlm_lkb *lkb;
4576 	struct dlm_args args;
4577 	struct dlm_user_args *ua;
4578 	int error;
4579 
4580 	dlm_lock_recovery(ls);
4581 
4582 	error = find_lkb(ls, lkid, &lkb);
4583 	if (error)
4584 		goto out;
4585 
4586 	ua = lkb->lkb_ua;
4587 
4588 	if (lvb_in && ua->lksb.sb_lvbptr)
4589 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4590 	if (ua_tmp->castparam)
4591 		ua->castparam = ua_tmp->castparam;
4592 	ua->user_lksb = ua_tmp->user_lksb;
4593 
4594 	error = set_unlock_args(flags, ua, &args);
4595 	if (error)
4596 		goto out_put;
4597 
4598 	error = unlock_lock(ls, lkb, &args);
4599 
4600 	if (error == -DLM_EUNLOCK)
4601 		error = 0;
4602 	/* from validate_unlock_args() */
4603 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4604 		error = 0;
4605 	if (error)
4606 		goto out_put;
4607 
4608 	spin_lock(&ua->proc->locks_spin);
4609 	/* dlm_user_add_ast() may have already taken lkb off the proc list */
4610 	if (!list_empty(&lkb->lkb_ownqueue))
4611 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4612 	spin_unlock(&ua->proc->locks_spin);
4613  out_put:
4614 	dlm_put_lkb(lkb);
4615  out:
4616 	dlm_unlock_recovery(ls);
4617 	kfree(ua_tmp);
4618 	return error;
4619 }
4620 
4621 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4622 		    uint32_t flags, uint32_t lkid)
4623 {
4624 	struct dlm_lkb *lkb;
4625 	struct dlm_args args;
4626 	struct dlm_user_args *ua;
4627 	int error;
4628 
4629 	dlm_lock_recovery(ls);
4630 
4631 	error = find_lkb(ls, lkid, &lkb);
4632 	if (error)
4633 		goto out;
4634 
4635 	ua = lkb->lkb_ua;
4636 	if (ua_tmp->castparam)
4637 		ua->castparam = ua_tmp->castparam;
4638 	ua->user_lksb = ua_tmp->user_lksb;
4639 
4640 	error = set_unlock_args(flags, ua, &args);
4641 	if (error)
4642 		goto out_put;
4643 
4644 	error = cancel_lock(ls, lkb, &args);
4645 
4646 	if (error == -DLM_ECANCEL)
4647 		error = 0;
4648 	/* from validate_unlock_args() */
4649 	if (error == -EBUSY)
4650 		error = 0;
4651  out_put:
4652 	dlm_put_lkb(lkb);
4653  out:
4654 	dlm_unlock_recovery(ls);
4655 	kfree(ua_tmp);
4656 	return error;
4657 }
4658 
4659 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4660 {
4661 	struct dlm_lkb *lkb;
4662 	struct dlm_args args;
4663 	struct dlm_user_args *ua;
4664 	struct dlm_rsb *r;
4665 	int error;
4666 
4667 	dlm_lock_recovery(ls);
4668 
4669 	error = find_lkb(ls, lkid, &lkb);
4670 	if (error)
4671 		goto out;
4672 
4673 	ua = lkb->lkb_ua;
4674 
4675 	error = set_unlock_args(flags, ua, &args);
4676 	if (error)
4677 		goto out_put;
4678 
4679 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4680 
4681 	r = lkb->lkb_resource;
4682 	hold_rsb(r);
4683 	lock_rsb(r);
4684 
4685 	error = validate_unlock_args(lkb, &args);
4686 	if (error)
4687 		goto out_r;
4688 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4689 
4690 	error = _cancel_lock(r, lkb);
4691  out_r:
4692 	unlock_rsb(r);
4693 	put_rsb(r);
4694 
4695 	if (error == -DLM_ECANCEL)
4696 		error = 0;
4697 	/* from validate_unlock_args() */
4698 	if (error == -EBUSY)
4699 		error = 0;
4700  out_put:
4701 	dlm_put_lkb(lkb);
4702  out:
4703 	dlm_unlock_recovery(ls);
4704 	return error;
4705 }
4706 
4707 /* lkb's that are removed from the waiters list by revert are just left on the
4708    orphans list with the granted orphan locks, to be freed by purge */
4709 
4710 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4711 {
4712 	struct dlm_args args;
4713 	int error;
4714 
4715 	hold_lkb(lkb);
4716 	mutex_lock(&ls->ls_orphans_mutex);
4717 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4718 	mutex_unlock(&ls->ls_orphans_mutex);
4719 
4720 	set_unlock_args(0, lkb->lkb_ua, &args);
4721 
4722 	error = cancel_lock(ls, lkb, &args);
4723 	if (error == -DLM_ECANCEL)
4724 		error = 0;
4725 	return error;
4726 }
4727 
4728 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4729    Regardless of what rsb queue the lock is on, it's removed and freed. */
4730 
4731 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4732 {
4733 	struct dlm_args args;
4734 	int error;
4735 
4736 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4737 
4738 	error = unlock_lock(ls, lkb, &args);
4739 	if (error == -DLM_EUNLOCK)
4740 		error = 0;
4741 	return error;
4742 }
4743 
4744 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4745    (which does lock_rsb) due to deadlock with receiving a message that does
4746    lock_rsb followed by dlm_user_add_ast() */
4747 
4748 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4749 				     struct dlm_user_proc *proc)
4750 {
4751 	struct dlm_lkb *lkb = NULL;
4752 
4753 	mutex_lock(&ls->ls_clear_proc_locks);
4754 	if (list_empty(&proc->locks))
4755 		goto out;
4756 
4757 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4758 	list_del_init(&lkb->lkb_ownqueue);
4759 
4760 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4761 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4762 	else
4763 		lkb->lkb_flags |= DLM_IFL_DEAD;
4764  out:
4765 	mutex_unlock(&ls->ls_clear_proc_locks);
4766 	return lkb;
4767 }
4768 
4769 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4770    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4771    which we clear here. */
4772 
4773 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4774    list, and no more device_writes should add lkb's to proc->locks list; so we
4775    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4776    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4777    them ourself. */
4778 
4779 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4780 {
4781 	struct dlm_lkb *lkb, *safe;
4782 
4783 	dlm_lock_recovery(ls);
4784 
4785 	while (1) {
4786 		lkb = del_proc_lock(ls, proc);
4787 		if (!lkb)
4788 			break;
4789 		del_timeout(lkb);
4790 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4791 			orphan_proc_lock(ls, lkb);
4792 		else
4793 			unlock_proc_lock(ls, lkb);
4794 
4795 		/* this removes the reference for the proc->locks list
4796 		   added by dlm_user_request, it may result in the lkb
4797 		   being freed */
4798 
4799 		dlm_put_lkb(lkb);
4800 	}
4801 
4802 	mutex_lock(&ls->ls_clear_proc_locks);
4803 
4804 	/* in-progress unlocks */
4805 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4806 		list_del_init(&lkb->lkb_ownqueue);
4807 		lkb->lkb_flags |= DLM_IFL_DEAD;
4808 		dlm_put_lkb(lkb);
4809 	}
4810 
4811 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4812 		lkb->lkb_ast_type = 0;
4813 		list_del(&lkb->lkb_astqueue);
4814 		dlm_put_lkb(lkb);
4815 	}
4816 
4817 	mutex_unlock(&ls->ls_clear_proc_locks);
4818 	dlm_unlock_recovery(ls);
4819 }
4820 
4821 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4822 {
4823 	struct dlm_lkb *lkb, *safe;
4824 
4825 	while (1) {
4826 		lkb = NULL;
4827 		spin_lock(&proc->locks_spin);
4828 		if (!list_empty(&proc->locks)) {
4829 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
4830 					 lkb_ownqueue);
4831 			list_del_init(&lkb->lkb_ownqueue);
4832 		}
4833 		spin_unlock(&proc->locks_spin);
4834 
4835 		if (!lkb)
4836 			break;
4837 
4838 		lkb->lkb_flags |= DLM_IFL_DEAD;
4839 		unlock_proc_lock(ls, lkb);
4840 		dlm_put_lkb(lkb); /* ref from proc->locks list */
4841 	}
4842 
4843 	spin_lock(&proc->locks_spin);
4844 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4845 		list_del_init(&lkb->lkb_ownqueue);
4846 		lkb->lkb_flags |= DLM_IFL_DEAD;
4847 		dlm_put_lkb(lkb);
4848 	}
4849 	spin_unlock(&proc->locks_spin);
4850 
4851 	spin_lock(&proc->asts_spin);
4852 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4853 		list_del(&lkb->lkb_astqueue);
4854 		dlm_put_lkb(lkb);
4855 	}
4856 	spin_unlock(&proc->asts_spin);
4857 }
4858 
4859 /* pid of 0 means purge all orphans */
4860 
4861 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4862 {
4863 	struct dlm_lkb *lkb, *safe;
4864 
4865 	mutex_lock(&ls->ls_orphans_mutex);
4866 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4867 		if (pid && lkb->lkb_ownpid != pid)
4868 			continue;
4869 		unlock_proc_lock(ls, lkb);
4870 		list_del_init(&lkb->lkb_ownqueue);
4871 		dlm_put_lkb(lkb);
4872 	}
4873 	mutex_unlock(&ls->ls_orphans_mutex);
4874 }
4875 
4876 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4877 {
4878 	struct dlm_message *ms;
4879 	struct dlm_mhandle *mh;
4880 	int error;
4881 
4882 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4883 				DLM_MSG_PURGE, &ms, &mh);
4884 	if (error)
4885 		return error;
4886 	ms->m_nodeid = nodeid;
4887 	ms->m_pid = pid;
4888 
4889 	return send_message(mh, ms);
4890 }
4891 
4892 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4893 		   int nodeid, int pid)
4894 {
4895 	int error = 0;
4896 
4897 	if (nodeid != dlm_our_nodeid()) {
4898 		error = send_purge(ls, nodeid, pid);
4899 	} else {
4900 		dlm_lock_recovery(ls);
4901 		if (pid == current->pid)
4902 			purge_proc_locks(ls, proc);
4903 		else
4904 			do_purge(ls, nodeid, pid);
4905 		dlm_unlock_recovery(ls);
4906 	}
4907 	return error;
4908 }
4909 
4910