xref: /linux/fs/dlm/lock.c (revision 606d099cdd1080bbb50ea50dc52d98252f8f10a1)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include "dlm_internal.h"
60 #include <linux/dlm_device.h>
61 #include "memory.h"
62 #include "lowcomms.h"
63 #include "requestqueue.h"
64 #include "util.h"
65 #include "dir.h"
66 #include "member.h"
67 #include "lockspace.h"
68 #include "ast.h"
69 #include "lock.h"
70 #include "rcom.h"
71 #include "recover.h"
72 #include "lvb_table.h"
73 #include "user.h"
74 #include "config.h"
75 
76 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83 static int send_remove(struct dlm_rsb *r);
84 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 				    struct dlm_message *ms);
87 static int receive_extralen(struct dlm_message *ms);
88 
89 /*
90  * Lock compatibilty matrix - thanks Steve
91  * UN = Unlocked state. Not really a state, used as a flag
92  * PD = Padding. Used to make the matrix a nice power of two in size
93  * Other states are the same as the VMS DLM.
94  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
95  */
96 
97 static const int __dlm_compat_matrix[8][8] = {
98       /* UN NL CR CW PR PW EX PD */
99         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
100         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
101         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
102         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
103         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
104         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
105         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
106         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
107 };
108 
109 /*
110  * This defines the direction of transfer of LVB data.
111  * Granted mode is the row; requested mode is the column.
112  * Usage: matrix[grmode+1][rqmode+1]
113  * 1 = LVB is returned to the caller
114  * 0 = LVB is written to the resource
115  * -1 = nothing happens to the LVB
116  */
117 
118 const int dlm_lvb_operations[8][8] = {
119         /* UN   NL  CR  CW  PR  PW  EX  PD*/
120         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
121         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
122         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
123         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
124         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
125         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
126         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
127         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
128 };
129 
130 #define modes_compat(gr, rq) \
131 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
132 
133 int dlm_modes_compat(int mode1, int mode2)
134 {
135 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
136 }
137 
138 /*
139  * Compatibility matrix for conversions with QUECVT set.
140  * Granted mode is the row; requested mode is the column.
141  * Usage: matrix[grmode+1][rqmode+1]
142  */
143 
144 static const int __quecvt_compat_matrix[8][8] = {
145       /* UN NL CR CW PR PW EX PD */
146         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
147         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
148         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
149         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
150         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
151         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
152         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
153         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
154 };
155 
156 void dlm_print_lkb(struct dlm_lkb *lkb)
157 {
158 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
159 	       "     status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
160 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
161 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
162 	       lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
163 }
164 
165 void dlm_print_rsb(struct dlm_rsb *r)
166 {
167 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
168 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
169 	       r->res_recover_locks_count, r->res_name);
170 }
171 
172 void dlm_dump_rsb(struct dlm_rsb *r)
173 {
174 	struct dlm_lkb *lkb;
175 
176 	dlm_print_rsb(r);
177 
178 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
179 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
180 	printk(KERN_ERR "rsb lookup list\n");
181 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
182 		dlm_print_lkb(lkb);
183 	printk(KERN_ERR "rsb grant queue:\n");
184 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
185 		dlm_print_lkb(lkb);
186 	printk(KERN_ERR "rsb convert queue:\n");
187 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
188 		dlm_print_lkb(lkb);
189 	printk(KERN_ERR "rsb wait queue:\n");
190 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
191 		dlm_print_lkb(lkb);
192 }
193 
194 /* Threads cannot use the lockspace while it's being recovered */
195 
196 static inline void lock_recovery(struct dlm_ls *ls)
197 {
198 	down_read(&ls->ls_in_recovery);
199 }
200 
201 static inline void unlock_recovery(struct dlm_ls *ls)
202 {
203 	up_read(&ls->ls_in_recovery);
204 }
205 
206 static inline int lock_recovery_try(struct dlm_ls *ls)
207 {
208 	return down_read_trylock(&ls->ls_in_recovery);
209 }
210 
211 static inline int can_be_queued(struct dlm_lkb *lkb)
212 {
213 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
214 }
215 
216 static inline int force_blocking_asts(struct dlm_lkb *lkb)
217 {
218 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
219 }
220 
221 static inline int is_demoted(struct dlm_lkb *lkb)
222 {
223 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
224 }
225 
226 static inline int is_remote(struct dlm_rsb *r)
227 {
228 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
229 	return !!r->res_nodeid;
230 }
231 
232 static inline int is_process_copy(struct dlm_lkb *lkb)
233 {
234 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
235 }
236 
237 static inline int is_master_copy(struct dlm_lkb *lkb)
238 {
239 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
240 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
241 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
242 }
243 
244 static inline int middle_conversion(struct dlm_lkb *lkb)
245 {
246 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
247 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
248 		return 1;
249 	return 0;
250 }
251 
252 static inline int down_conversion(struct dlm_lkb *lkb)
253 {
254 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
255 }
256 
257 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
258 {
259 	if (is_master_copy(lkb))
260 		return;
261 
262 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
263 
264 	lkb->lkb_lksb->sb_status = rv;
265 	lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
266 
267 	dlm_add_ast(lkb, AST_COMP);
268 }
269 
270 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
271 {
272 	if (is_master_copy(lkb))
273 		send_bast(r, lkb, rqmode);
274 	else {
275 		lkb->lkb_bastmode = rqmode;
276 		dlm_add_ast(lkb, AST_BAST);
277 	}
278 }
279 
280 /*
281  * Basic operations on rsb's and lkb's
282  */
283 
284 static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
285 {
286 	struct dlm_rsb *r;
287 
288 	r = allocate_rsb(ls, len);
289 	if (!r)
290 		return NULL;
291 
292 	r->res_ls = ls;
293 	r->res_length = len;
294 	memcpy(r->res_name, name, len);
295 	mutex_init(&r->res_mutex);
296 
297 	INIT_LIST_HEAD(&r->res_lookup);
298 	INIT_LIST_HEAD(&r->res_grantqueue);
299 	INIT_LIST_HEAD(&r->res_convertqueue);
300 	INIT_LIST_HEAD(&r->res_waitqueue);
301 	INIT_LIST_HEAD(&r->res_root_list);
302 	INIT_LIST_HEAD(&r->res_recover_list);
303 
304 	return r;
305 }
306 
307 static int search_rsb_list(struct list_head *head, char *name, int len,
308 			   unsigned int flags, struct dlm_rsb **r_ret)
309 {
310 	struct dlm_rsb *r;
311 	int error = 0;
312 
313 	list_for_each_entry(r, head, res_hashchain) {
314 		if (len == r->res_length && !memcmp(name, r->res_name, len))
315 			goto found;
316 	}
317 	return -EBADR;
318 
319  found:
320 	if (r->res_nodeid && (flags & R_MASTER))
321 		error = -ENOTBLK;
322 	*r_ret = r;
323 	return error;
324 }
325 
326 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
327 		       unsigned int flags, struct dlm_rsb **r_ret)
328 {
329 	struct dlm_rsb *r;
330 	int error;
331 
332 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
333 	if (!error) {
334 		kref_get(&r->res_ref);
335 		goto out;
336 	}
337 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
338 	if (error)
339 		goto out;
340 
341 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
342 
343 	if (dlm_no_directory(ls))
344 		goto out;
345 
346 	if (r->res_nodeid == -1) {
347 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
348 		r->res_first_lkid = 0;
349 	} else if (r->res_nodeid > 0) {
350 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
351 		r->res_first_lkid = 0;
352 	} else {
353 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
354 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
355 	}
356  out:
357 	*r_ret = r;
358 	return error;
359 }
360 
361 static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
362 		      unsigned int flags, struct dlm_rsb **r_ret)
363 {
364 	int error;
365 	write_lock(&ls->ls_rsbtbl[b].lock);
366 	error = _search_rsb(ls, name, len, b, flags, r_ret);
367 	write_unlock(&ls->ls_rsbtbl[b].lock);
368 	return error;
369 }
370 
371 /*
372  * Find rsb in rsbtbl and potentially create/add one
373  *
374  * Delaying the release of rsb's has a similar benefit to applications keeping
375  * NL locks on an rsb, but without the guarantee that the cached master value
376  * will still be valid when the rsb is reused.  Apps aren't always smart enough
377  * to keep NL locks on an rsb that they may lock again shortly; this can lead
378  * to excessive master lookups and removals if we don't delay the release.
379  *
380  * Searching for an rsb means looking through both the normal list and toss
381  * list.  When found on the toss list the rsb is moved to the normal list with
382  * ref count of 1; when found on normal list the ref count is incremented.
383  */
384 
385 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
386 		    unsigned int flags, struct dlm_rsb **r_ret)
387 {
388 	struct dlm_rsb *r, *tmp;
389 	uint32_t hash, bucket;
390 	int error = 0;
391 
392 	if (dlm_no_directory(ls))
393 		flags |= R_CREATE;
394 
395 	hash = jhash(name, namelen, 0);
396 	bucket = hash & (ls->ls_rsbtbl_size - 1);
397 
398 	error = search_rsb(ls, name, namelen, bucket, flags, &r);
399 	if (!error)
400 		goto out;
401 
402 	if (error == -EBADR && !(flags & R_CREATE))
403 		goto out;
404 
405 	/* the rsb was found but wasn't a master copy */
406 	if (error == -ENOTBLK)
407 		goto out;
408 
409 	error = -ENOMEM;
410 	r = create_rsb(ls, name, namelen);
411 	if (!r)
412 		goto out;
413 
414 	r->res_hash = hash;
415 	r->res_bucket = bucket;
416 	r->res_nodeid = -1;
417 	kref_init(&r->res_ref);
418 
419 	/* With no directory, the master can be set immediately */
420 	if (dlm_no_directory(ls)) {
421 		int nodeid = dlm_dir_nodeid(r);
422 		if (nodeid == dlm_our_nodeid())
423 			nodeid = 0;
424 		r->res_nodeid = nodeid;
425 	}
426 
427 	write_lock(&ls->ls_rsbtbl[bucket].lock);
428 	error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
429 	if (!error) {
430 		write_unlock(&ls->ls_rsbtbl[bucket].lock);
431 		free_rsb(r);
432 		r = tmp;
433 		goto out;
434 	}
435 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
436 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
437 	error = 0;
438  out:
439 	*r_ret = r;
440 	return error;
441 }
442 
443 int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
444 		 unsigned int flags, struct dlm_rsb **r_ret)
445 {
446 	return find_rsb(ls, name, namelen, flags, r_ret);
447 }
448 
449 /* This is only called to add a reference when the code already holds
450    a valid reference to the rsb, so there's no need for locking. */
451 
452 static inline void hold_rsb(struct dlm_rsb *r)
453 {
454 	kref_get(&r->res_ref);
455 }
456 
457 void dlm_hold_rsb(struct dlm_rsb *r)
458 {
459 	hold_rsb(r);
460 }
461 
462 static void toss_rsb(struct kref *kref)
463 {
464 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
465 	struct dlm_ls *ls = r->res_ls;
466 
467 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
468 	kref_init(&r->res_ref);
469 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
470 	r->res_toss_time = jiffies;
471 	if (r->res_lvbptr) {
472 		free_lvb(r->res_lvbptr);
473 		r->res_lvbptr = NULL;
474 	}
475 }
476 
477 /* When all references to the rsb are gone it's transfered to
478    the tossed list for later disposal. */
479 
480 static void put_rsb(struct dlm_rsb *r)
481 {
482 	struct dlm_ls *ls = r->res_ls;
483 	uint32_t bucket = r->res_bucket;
484 
485 	write_lock(&ls->ls_rsbtbl[bucket].lock);
486 	kref_put(&r->res_ref, toss_rsb);
487 	write_unlock(&ls->ls_rsbtbl[bucket].lock);
488 }
489 
490 void dlm_put_rsb(struct dlm_rsb *r)
491 {
492 	put_rsb(r);
493 }
494 
495 /* See comment for unhold_lkb */
496 
497 static void unhold_rsb(struct dlm_rsb *r)
498 {
499 	int rv;
500 	rv = kref_put(&r->res_ref, toss_rsb);
501 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
502 }
503 
504 static void kill_rsb(struct kref *kref)
505 {
506 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
507 
508 	/* All work is done after the return from kref_put() so we
509 	   can release the write_lock before the remove and free. */
510 
511 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
512 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
513 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
514 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
515 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
516 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
517 }
518 
519 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
520    The rsb must exist as long as any lkb's for it do. */
521 
522 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
523 {
524 	hold_rsb(r);
525 	lkb->lkb_resource = r;
526 }
527 
528 static void detach_lkb(struct dlm_lkb *lkb)
529 {
530 	if (lkb->lkb_resource) {
531 		put_rsb(lkb->lkb_resource);
532 		lkb->lkb_resource = NULL;
533 	}
534 }
535 
536 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
537 {
538 	struct dlm_lkb *lkb, *tmp;
539 	uint32_t lkid = 0;
540 	uint16_t bucket;
541 
542 	lkb = allocate_lkb(ls);
543 	if (!lkb)
544 		return -ENOMEM;
545 
546 	lkb->lkb_nodeid = -1;
547 	lkb->lkb_grmode = DLM_LOCK_IV;
548 	kref_init(&lkb->lkb_ref);
549 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
550 
551 	get_random_bytes(&bucket, sizeof(bucket));
552 	bucket &= (ls->ls_lkbtbl_size - 1);
553 
554 	write_lock(&ls->ls_lkbtbl[bucket].lock);
555 
556 	/* counter can roll over so we must verify lkid is not in use */
557 
558 	while (lkid == 0) {
559 		lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
560 
561 		list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
562 				    lkb_idtbl_list) {
563 			if (tmp->lkb_id != lkid)
564 				continue;
565 			lkid = 0;
566 			break;
567 		}
568 	}
569 
570 	lkb->lkb_id = lkid;
571 	list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
572 	write_unlock(&ls->ls_lkbtbl[bucket].lock);
573 
574 	*lkb_ret = lkb;
575 	return 0;
576 }
577 
578 static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
579 {
580 	uint16_t bucket = lkid & 0xFFFF;
581 	struct dlm_lkb *lkb;
582 
583 	list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
584 		if (lkb->lkb_id == lkid)
585 			return lkb;
586 	}
587 	return NULL;
588 }
589 
590 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
591 {
592 	struct dlm_lkb *lkb;
593 	uint16_t bucket = lkid & 0xFFFF;
594 
595 	if (bucket >= ls->ls_lkbtbl_size)
596 		return -EBADSLT;
597 
598 	read_lock(&ls->ls_lkbtbl[bucket].lock);
599 	lkb = __find_lkb(ls, lkid);
600 	if (lkb)
601 		kref_get(&lkb->lkb_ref);
602 	read_unlock(&ls->ls_lkbtbl[bucket].lock);
603 
604 	*lkb_ret = lkb;
605 	return lkb ? 0 : -ENOENT;
606 }
607 
608 static void kill_lkb(struct kref *kref)
609 {
610 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
611 
612 	/* All work is done after the return from kref_put() so we
613 	   can release the write_lock before the detach_lkb */
614 
615 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
616 }
617 
618 /* __put_lkb() is used when an lkb may not have an rsb attached to
619    it so we need to provide the lockspace explicitly */
620 
621 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
622 {
623 	uint16_t bucket = lkb->lkb_id & 0xFFFF;
624 
625 	write_lock(&ls->ls_lkbtbl[bucket].lock);
626 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
627 		list_del(&lkb->lkb_idtbl_list);
628 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
629 
630 		detach_lkb(lkb);
631 
632 		/* for local/process lkbs, lvbptr points to caller's lksb */
633 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
634 			free_lvb(lkb->lkb_lvbptr);
635 		free_lkb(lkb);
636 		return 1;
637 	} else {
638 		write_unlock(&ls->ls_lkbtbl[bucket].lock);
639 		return 0;
640 	}
641 }
642 
643 int dlm_put_lkb(struct dlm_lkb *lkb)
644 {
645 	struct dlm_ls *ls;
646 
647 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
648 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
649 
650 	ls = lkb->lkb_resource->res_ls;
651 	return __put_lkb(ls, lkb);
652 }
653 
654 /* This is only called to add a reference when the code already holds
655    a valid reference to the lkb, so there's no need for locking. */
656 
657 static inline void hold_lkb(struct dlm_lkb *lkb)
658 {
659 	kref_get(&lkb->lkb_ref);
660 }
661 
662 /* This is called when we need to remove a reference and are certain
663    it's not the last ref.  e.g. del_lkb is always called between a
664    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
665    put_lkb would work fine, but would involve unnecessary locking */
666 
667 static inline void unhold_lkb(struct dlm_lkb *lkb)
668 {
669 	int rv;
670 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
671 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
672 }
673 
674 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
675 			    int mode)
676 {
677 	struct dlm_lkb *lkb = NULL;
678 
679 	list_for_each_entry(lkb, head, lkb_statequeue)
680 		if (lkb->lkb_rqmode < mode)
681 			break;
682 
683 	if (!lkb)
684 		list_add_tail(new, head);
685 	else
686 		__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
687 }
688 
689 /* add/remove lkb to rsb's grant/convert/wait queue */
690 
691 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
692 {
693 	kref_get(&lkb->lkb_ref);
694 
695 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
696 
697 	lkb->lkb_status = status;
698 
699 	switch (status) {
700 	case DLM_LKSTS_WAITING:
701 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
702 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
703 		else
704 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
705 		break;
706 	case DLM_LKSTS_GRANTED:
707 		/* convention says granted locks kept in order of grmode */
708 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
709 				lkb->lkb_grmode);
710 		break;
711 	case DLM_LKSTS_CONVERT:
712 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
713 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
714 		else
715 			list_add_tail(&lkb->lkb_statequeue,
716 				      &r->res_convertqueue);
717 		break;
718 	default:
719 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
720 	}
721 }
722 
723 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
724 {
725 	lkb->lkb_status = 0;
726 	list_del(&lkb->lkb_statequeue);
727 	unhold_lkb(lkb);
728 }
729 
730 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
731 {
732 	hold_lkb(lkb);
733 	del_lkb(r, lkb);
734 	add_lkb(r, lkb, sts);
735 	unhold_lkb(lkb);
736 }
737 
738 /* add/remove lkb from global waiters list of lkb's waiting for
739    a reply from a remote node */
740 
741 static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
742 {
743 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
744 
745 	mutex_lock(&ls->ls_waiters_mutex);
746 	if (lkb->lkb_wait_type) {
747 		log_print("add_to_waiters error %d", lkb->lkb_wait_type);
748 		goto out;
749 	}
750 	lkb->lkb_wait_type = mstype;
751 	kref_get(&lkb->lkb_ref);
752 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
753  out:
754 	mutex_unlock(&ls->ls_waiters_mutex);
755 }
756 
757 static int _remove_from_waiters(struct dlm_lkb *lkb)
758 {
759 	int error = 0;
760 
761 	if (!lkb->lkb_wait_type) {
762 		log_print("remove_from_waiters error");
763 		error = -EINVAL;
764 		goto out;
765 	}
766 	lkb->lkb_wait_type = 0;
767 	list_del(&lkb->lkb_wait_reply);
768 	unhold_lkb(lkb);
769  out:
770 	return error;
771 }
772 
773 static int remove_from_waiters(struct dlm_lkb *lkb)
774 {
775 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
776 	int error;
777 
778 	mutex_lock(&ls->ls_waiters_mutex);
779 	error = _remove_from_waiters(lkb);
780 	mutex_unlock(&ls->ls_waiters_mutex);
781 	return error;
782 }
783 
784 static void dir_remove(struct dlm_rsb *r)
785 {
786 	int to_nodeid;
787 
788 	if (dlm_no_directory(r->res_ls))
789 		return;
790 
791 	to_nodeid = dlm_dir_nodeid(r);
792 	if (to_nodeid != dlm_our_nodeid())
793 		send_remove(r);
794 	else
795 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
796 				     r->res_name, r->res_length);
797 }
798 
799 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
800    found since they are in order of newest to oldest? */
801 
802 static int shrink_bucket(struct dlm_ls *ls, int b)
803 {
804 	struct dlm_rsb *r;
805 	int count = 0, found;
806 
807 	for (;;) {
808 		found = 0;
809 		write_lock(&ls->ls_rsbtbl[b].lock);
810 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
811 					    res_hashchain) {
812 			if (!time_after_eq(jiffies, r->res_toss_time +
813 					   dlm_config.toss_secs * HZ))
814 				continue;
815 			found = 1;
816 			break;
817 		}
818 
819 		if (!found) {
820 			write_unlock(&ls->ls_rsbtbl[b].lock);
821 			break;
822 		}
823 
824 		if (kref_put(&r->res_ref, kill_rsb)) {
825 			list_del(&r->res_hashchain);
826 			write_unlock(&ls->ls_rsbtbl[b].lock);
827 
828 			if (is_master(r))
829 				dir_remove(r);
830 			free_rsb(r);
831 			count++;
832 		} else {
833 			write_unlock(&ls->ls_rsbtbl[b].lock);
834 			log_error(ls, "tossed rsb in use %s", r->res_name);
835 		}
836 	}
837 
838 	return count;
839 }
840 
841 void dlm_scan_rsbs(struct dlm_ls *ls)
842 {
843 	int i;
844 
845 	if (dlm_locking_stopped(ls))
846 		return;
847 
848 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
849 		shrink_bucket(ls, i);
850 		cond_resched();
851 	}
852 }
853 
854 /* lkb is master or local copy */
855 
856 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
857 {
858 	int b, len = r->res_ls->ls_lvblen;
859 
860 	/* b=1 lvb returned to caller
861 	   b=0 lvb written to rsb or invalidated
862 	   b=-1 do nothing */
863 
864 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
865 
866 	if (b == 1) {
867 		if (!lkb->lkb_lvbptr)
868 			return;
869 
870 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
871 			return;
872 
873 		if (!r->res_lvbptr)
874 			return;
875 
876 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
877 		lkb->lkb_lvbseq = r->res_lvbseq;
878 
879 	} else if (b == 0) {
880 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
881 			rsb_set_flag(r, RSB_VALNOTVALID);
882 			return;
883 		}
884 
885 		if (!lkb->lkb_lvbptr)
886 			return;
887 
888 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
889 			return;
890 
891 		if (!r->res_lvbptr)
892 			r->res_lvbptr = allocate_lvb(r->res_ls);
893 
894 		if (!r->res_lvbptr)
895 			return;
896 
897 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
898 		r->res_lvbseq++;
899 		lkb->lkb_lvbseq = r->res_lvbseq;
900 		rsb_clear_flag(r, RSB_VALNOTVALID);
901 	}
902 
903 	if (rsb_flag(r, RSB_VALNOTVALID))
904 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
905 }
906 
907 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
908 {
909 	if (lkb->lkb_grmode < DLM_LOCK_PW)
910 		return;
911 
912 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
913 		rsb_set_flag(r, RSB_VALNOTVALID);
914 		return;
915 	}
916 
917 	if (!lkb->lkb_lvbptr)
918 		return;
919 
920 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
921 		return;
922 
923 	if (!r->res_lvbptr)
924 		r->res_lvbptr = allocate_lvb(r->res_ls);
925 
926 	if (!r->res_lvbptr)
927 		return;
928 
929 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
930 	r->res_lvbseq++;
931 	rsb_clear_flag(r, RSB_VALNOTVALID);
932 }
933 
934 /* lkb is process copy (pc) */
935 
936 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
937 			    struct dlm_message *ms)
938 {
939 	int b;
940 
941 	if (!lkb->lkb_lvbptr)
942 		return;
943 
944 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
945 		return;
946 
947 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
948 	if (b == 1) {
949 		int len = receive_extralen(ms);
950 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
951 		lkb->lkb_lvbseq = ms->m_lvbseq;
952 	}
953 }
954 
955 /* Manipulate lkb's on rsb's convert/granted/waiting queues
956    remove_lock -- used for unlock, removes lkb from granted
957    revert_lock -- used for cancel, moves lkb from convert to granted
958    grant_lock  -- used for request and convert, adds lkb to granted or
959                   moves lkb from convert or waiting to granted
960 
961    Each of these is used for master or local copy lkb's.  There is
962    also a _pc() variation used to make the corresponding change on
963    a process copy (pc) lkb. */
964 
965 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
966 {
967 	del_lkb(r, lkb);
968 	lkb->lkb_grmode = DLM_LOCK_IV;
969 	/* this unhold undoes the original ref from create_lkb()
970 	   so this leads to the lkb being freed */
971 	unhold_lkb(lkb);
972 }
973 
974 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
975 {
976 	set_lvb_unlock(r, lkb);
977 	_remove_lock(r, lkb);
978 }
979 
980 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
981 {
982 	_remove_lock(r, lkb);
983 }
984 
985 static void revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
986 {
987 	lkb->lkb_rqmode = DLM_LOCK_IV;
988 
989 	switch (lkb->lkb_status) {
990 	case DLM_LKSTS_GRANTED:
991 		break;
992 	case DLM_LKSTS_CONVERT:
993 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
994 		break;
995 	case DLM_LKSTS_WAITING:
996 		del_lkb(r, lkb);
997 		lkb->lkb_grmode = DLM_LOCK_IV;
998 		/* this unhold undoes the original ref from create_lkb()
999 		   so this leads to the lkb being freed */
1000 		unhold_lkb(lkb);
1001 		break;
1002 	default:
1003 		log_print("invalid status for revert %d", lkb->lkb_status);
1004 	}
1005 }
1006 
1007 static void revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1008 {
1009 	revert_lock(r, lkb);
1010 }
1011 
1012 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1013 {
1014 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1015 		lkb->lkb_grmode = lkb->lkb_rqmode;
1016 		if (lkb->lkb_status)
1017 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1018 		else
1019 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1020 	}
1021 
1022 	lkb->lkb_rqmode = DLM_LOCK_IV;
1023 }
1024 
1025 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1026 {
1027 	set_lvb_lock(r, lkb);
1028 	_grant_lock(r, lkb);
1029 	lkb->lkb_highbast = 0;
1030 }
1031 
1032 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1033 			  struct dlm_message *ms)
1034 {
1035 	set_lvb_lock_pc(r, lkb, ms);
1036 	_grant_lock(r, lkb);
1037 }
1038 
1039 /* called by grant_pending_locks() which means an async grant message must
1040    be sent to the requesting node in addition to granting the lock if the
1041    lkb belongs to a remote node. */
1042 
1043 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1044 {
1045 	grant_lock(r, lkb);
1046 	if (is_master_copy(lkb))
1047 		send_grant(r, lkb);
1048 	else
1049 		queue_cast(r, lkb, 0);
1050 }
1051 
1052 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1053 {
1054 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1055 					   lkb_statequeue);
1056 	if (lkb->lkb_id == first->lkb_id)
1057 		return 1;
1058 
1059 	return 0;
1060 }
1061 
1062 /* Check if the given lkb conflicts with another lkb on the queue. */
1063 
1064 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1065 {
1066 	struct dlm_lkb *this;
1067 
1068 	list_for_each_entry(this, head, lkb_statequeue) {
1069 		if (this == lkb)
1070 			continue;
1071 		if (!modes_compat(this, lkb))
1072 			return 1;
1073 	}
1074 	return 0;
1075 }
1076 
1077 /*
1078  * "A conversion deadlock arises with a pair of lock requests in the converting
1079  * queue for one resource.  The granted mode of each lock blocks the requested
1080  * mode of the other lock."
1081  *
1082  * Part 2: if the granted mode of lkb is preventing the first lkb in the
1083  * convert queue from being granted, then demote lkb (set grmode to NL).
1084  * This second form requires that we check for conv-deadlk even when
1085  * now == 0 in _can_be_granted().
1086  *
1087  * Example:
1088  * Granted Queue: empty
1089  * Convert Queue: NL->EX (first lock)
1090  *                PR->EX (second lock)
1091  *
1092  * The first lock can't be granted because of the granted mode of the second
1093  * lock and the second lock can't be granted because it's not first in the
1094  * list.  We demote the granted mode of the second lock (the lkb passed to this
1095  * function).
1096  *
1097  * After the resolution, the "grant pending" function needs to go back and try
1098  * to grant locks on the convert queue again since the first lock can now be
1099  * granted.
1100  */
1101 
1102 static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1103 {
1104 	struct dlm_lkb *this, *first = NULL, *self = NULL;
1105 
1106 	list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1107 		if (!first)
1108 			first = this;
1109 		if (this == lkb) {
1110 			self = lkb;
1111 			continue;
1112 		}
1113 
1114 		if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
1115 			return 1;
1116 	}
1117 
1118 	/* if lkb is on the convert queue and is preventing the first
1119 	   from being granted, then there's deadlock and we demote lkb.
1120 	   multiple converting locks may need to do this before the first
1121 	   converting lock can be granted. */
1122 
1123 	if (self && self != first) {
1124 		if (!modes_compat(lkb, first) &&
1125 		    !queue_conflict(&rsb->res_grantqueue, first))
1126 			return 1;
1127 	}
1128 
1129 	return 0;
1130 }
1131 
1132 /*
1133  * Return 1 if the lock can be granted, 0 otherwise.
1134  * Also detect and resolve conversion deadlocks.
1135  *
1136  * lkb is the lock to be granted
1137  *
1138  * now is 1 if the function is being called in the context of the
1139  * immediate request, it is 0 if called later, after the lock has been
1140  * queued.
1141  *
1142  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1143  */
1144 
1145 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1146 {
1147 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1148 
1149 	/*
1150 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1151 	 * a new request for a NL mode lock being blocked.
1152 	 *
1153 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1154 	 * request, then it would be granted.  In essence, the use of this flag
1155 	 * tells the Lock Manager to expedite theis request by not considering
1156 	 * what may be in the CONVERTING or WAITING queues...  As of this
1157 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1158 	 * mode locks.  This flag is not valid for conversion requests.
1159 	 *
1160 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1161 	 * conversion or used with a non-NL requested mode.  We also know an
1162 	 * EXPEDITE request is always granted immediately, so now must always
1163 	 * be 1.  The full condition to grant an expedite request: (now &&
1164 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1165 	 * therefore be shortened to just checking the flag.
1166 	 */
1167 
1168 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1169 		return 1;
1170 
1171 	/*
1172 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1173 	 * added to the remaining conditions.
1174 	 */
1175 
1176 	if (queue_conflict(&r->res_grantqueue, lkb))
1177 		goto out;
1178 
1179 	/*
1180 	 * 6-3: By default, a conversion request is immediately granted if the
1181 	 * requested mode is compatible with the modes of all other granted
1182 	 * locks
1183 	 */
1184 
1185 	if (queue_conflict(&r->res_convertqueue, lkb))
1186 		goto out;
1187 
1188 	/*
1189 	 * 6-5: But the default algorithm for deciding whether to grant or
1190 	 * queue conversion requests does not by itself guarantee that such
1191 	 * requests are serviced on a "first come first serve" basis.  This, in
1192 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1193 	 *
1194 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1195 	 * the system service employed to request a lock conversion.  This flag
1196 	 * forces certain conversion requests to be queued, even if they are
1197 	 * compatible with the granted modes of other locks on the same
1198 	 * resource.  Thus, the use of this flag results in conversion requests
1199 	 * being ordered on a "first come first servce" basis.
1200 	 *
1201 	 * DCT: This condition is all about new conversions being able to occur
1202 	 * "in place" while the lock remains on the granted queue (assuming
1203 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1204 	 * doesn't _have_ to go onto the convert queue where it's processed in
1205 	 * order.  The "now" variable is necessary to distinguish converts
1206 	 * being received and processed for the first time now, because once a
1207 	 * convert is moved to the conversion queue the condition below applies
1208 	 * requiring fifo granting.
1209 	 */
1210 
1211 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1212 		return 1;
1213 
1214 	/*
1215 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1216 	 * order.
1217 	 */
1218 
1219 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1220 		return 1;
1221 
1222 	/*
1223 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1224 	 * granted until all other conversion requests ahead of it are granted
1225 	 * and/or canceled.
1226 	 */
1227 
1228 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1229 		return 1;
1230 
1231 	/*
1232 	 * 6-4: By default, a new request is immediately granted only if all
1233 	 * three of the following conditions are satisfied when the request is
1234 	 * issued:
1235 	 * - The queue of ungranted conversion requests for the resource is
1236 	 *   empty.
1237 	 * - The queue of ungranted new requests for the resource is empty.
1238 	 * - The mode of the new request is compatible with the most
1239 	 *   restrictive mode of all granted locks on the resource.
1240 	 */
1241 
1242 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1243 	    list_empty(&r->res_waitqueue))
1244 		return 1;
1245 
1246 	/*
1247 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1248 	 * it cannot be granted until the queue of ungranted conversion
1249 	 * requests is empty, all ungranted new requests ahead of it are
1250 	 * granted and/or canceled, and it is compatible with the granted mode
1251 	 * of the most restrictive lock granted on the resource.
1252 	 */
1253 
1254 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1255 	    first_in_list(lkb, &r->res_waitqueue))
1256 		return 1;
1257 
1258  out:
1259 	/*
1260 	 * The following, enabled by CONVDEADLK, departs from VMS.
1261 	 */
1262 
1263 	if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1264 	    conversion_deadlock_detect(r, lkb)) {
1265 		lkb->lkb_grmode = DLM_LOCK_NL;
1266 		lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1267 	}
1268 
1269 	return 0;
1270 }
1271 
1272 /*
1273  * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1274  * simple way to provide a big optimization to applications that can use them.
1275  */
1276 
1277 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1278 {
1279 	uint32_t flags = lkb->lkb_exflags;
1280 	int rv;
1281 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1282 
1283 	rv = _can_be_granted(r, lkb, now);
1284 	if (rv)
1285 		goto out;
1286 
1287 	if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1288 		goto out;
1289 
1290 	if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1291 		alt = DLM_LOCK_PR;
1292 	else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1293 		alt = DLM_LOCK_CW;
1294 
1295 	if (alt) {
1296 		lkb->lkb_rqmode = alt;
1297 		rv = _can_be_granted(r, lkb, now);
1298 		if (rv)
1299 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1300 		else
1301 			lkb->lkb_rqmode = rqmode;
1302 	}
1303  out:
1304 	return rv;
1305 }
1306 
1307 static int grant_pending_convert(struct dlm_rsb *r, int high)
1308 {
1309 	struct dlm_lkb *lkb, *s;
1310 	int hi, demoted, quit, grant_restart, demote_restart;
1311 
1312 	quit = 0;
1313  restart:
1314 	grant_restart = 0;
1315 	demote_restart = 0;
1316 	hi = DLM_LOCK_IV;
1317 
1318 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1319 		demoted = is_demoted(lkb);
1320 		if (can_be_granted(r, lkb, 0)) {
1321 			grant_lock_pending(r, lkb);
1322 			grant_restart = 1;
1323 		} else {
1324 			hi = max_t(int, lkb->lkb_rqmode, hi);
1325 			if (!demoted && is_demoted(lkb))
1326 				demote_restart = 1;
1327 		}
1328 	}
1329 
1330 	if (grant_restart)
1331 		goto restart;
1332 	if (demote_restart && !quit) {
1333 		quit = 1;
1334 		goto restart;
1335 	}
1336 
1337 	return max_t(int, high, hi);
1338 }
1339 
1340 static int grant_pending_wait(struct dlm_rsb *r, int high)
1341 {
1342 	struct dlm_lkb *lkb, *s;
1343 
1344 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1345 		if (can_be_granted(r, lkb, 0))
1346 			grant_lock_pending(r, lkb);
1347                 else
1348 			high = max_t(int, lkb->lkb_rqmode, high);
1349 	}
1350 
1351 	return high;
1352 }
1353 
1354 static void grant_pending_locks(struct dlm_rsb *r)
1355 {
1356 	struct dlm_lkb *lkb, *s;
1357 	int high = DLM_LOCK_IV;
1358 
1359 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1360 
1361 	high = grant_pending_convert(r, high);
1362 	high = grant_pending_wait(r, high);
1363 
1364 	if (high == DLM_LOCK_IV)
1365 		return;
1366 
1367 	/*
1368 	 * If there are locks left on the wait/convert queue then send blocking
1369 	 * ASTs to granted locks based on the largest requested mode (high)
1370 	 * found above. FIXME: highbast < high comparison not valid for PR/CW.
1371 	 */
1372 
1373 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1374 		if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1375 		    !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1376 			queue_bast(r, lkb, high);
1377 			lkb->lkb_highbast = high;
1378 		}
1379 	}
1380 }
1381 
1382 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1383 			    struct dlm_lkb *lkb)
1384 {
1385 	struct dlm_lkb *gr;
1386 
1387 	list_for_each_entry(gr, head, lkb_statequeue) {
1388 		if (gr->lkb_bastaddr &&
1389 		    gr->lkb_highbast < lkb->lkb_rqmode &&
1390 		    !modes_compat(gr, lkb)) {
1391 			queue_bast(r, gr, lkb->lkb_rqmode);
1392 			gr->lkb_highbast = lkb->lkb_rqmode;
1393 		}
1394 	}
1395 }
1396 
1397 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1398 {
1399 	send_bast_queue(r, &r->res_grantqueue, lkb);
1400 }
1401 
1402 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1403 {
1404 	send_bast_queue(r, &r->res_grantqueue, lkb);
1405 	send_bast_queue(r, &r->res_convertqueue, lkb);
1406 }
1407 
1408 /* set_master(r, lkb) -- set the master nodeid of a resource
1409 
1410    The purpose of this function is to set the nodeid field in the given
1411    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1412    known, it can just be copied to the lkb and the function will return
1413    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1414    before it can be copied to the lkb.
1415 
1416    When the rsb nodeid is being looked up remotely, the initial lkb
1417    causing the lookup is kept on the ls_waiters list waiting for the
1418    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1419    on the rsb's res_lookup list until the master is verified.
1420 
1421    Return values:
1422    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1423    1: the rsb master is not available and the lkb has been placed on
1424       a wait queue
1425 */
1426 
1427 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1428 {
1429 	struct dlm_ls *ls = r->res_ls;
1430 	int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1431 
1432 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1433 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1434 		r->res_first_lkid = lkb->lkb_id;
1435 		lkb->lkb_nodeid = r->res_nodeid;
1436 		return 0;
1437 	}
1438 
1439 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1440 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1441 		return 1;
1442 	}
1443 
1444 	if (r->res_nodeid == 0) {
1445 		lkb->lkb_nodeid = 0;
1446 		return 0;
1447 	}
1448 
1449 	if (r->res_nodeid > 0) {
1450 		lkb->lkb_nodeid = r->res_nodeid;
1451 		return 0;
1452 	}
1453 
1454 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
1455 
1456 	dir_nodeid = dlm_dir_nodeid(r);
1457 
1458 	if (dir_nodeid != our_nodeid) {
1459 		r->res_first_lkid = lkb->lkb_id;
1460 		send_lookup(r, lkb);
1461 		return 1;
1462 	}
1463 
1464 	for (;;) {
1465 		/* It's possible for dlm_scand to remove an old rsb for
1466 		   this same resource from the toss list, us to create
1467 		   a new one, look up the master locally, and find it
1468 		   already exists just before dlm_scand does the
1469 		   dir_remove() on the previous rsb. */
1470 
1471 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1472 				       r->res_length, &ret_nodeid);
1473 		if (!error)
1474 			break;
1475 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1476 		schedule();
1477 	}
1478 
1479 	if (ret_nodeid == our_nodeid) {
1480 		r->res_first_lkid = 0;
1481 		r->res_nodeid = 0;
1482 		lkb->lkb_nodeid = 0;
1483 	} else {
1484 		r->res_first_lkid = lkb->lkb_id;
1485 		r->res_nodeid = ret_nodeid;
1486 		lkb->lkb_nodeid = ret_nodeid;
1487 	}
1488 	return 0;
1489 }
1490 
1491 static void process_lookup_list(struct dlm_rsb *r)
1492 {
1493 	struct dlm_lkb *lkb, *safe;
1494 
1495 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
1496 		list_del(&lkb->lkb_rsb_lookup);
1497 		_request_lock(r, lkb);
1498 		schedule();
1499 	}
1500 }
1501 
1502 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
1503 
1504 static void confirm_master(struct dlm_rsb *r, int error)
1505 {
1506 	struct dlm_lkb *lkb;
1507 
1508 	if (!r->res_first_lkid)
1509 		return;
1510 
1511 	switch (error) {
1512 	case 0:
1513 	case -EINPROGRESS:
1514 		r->res_first_lkid = 0;
1515 		process_lookup_list(r);
1516 		break;
1517 
1518 	case -EAGAIN:
1519 		/* the remote master didn't queue our NOQUEUE request;
1520 		   make a waiting lkb the first_lkid */
1521 
1522 		r->res_first_lkid = 0;
1523 
1524 		if (!list_empty(&r->res_lookup)) {
1525 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1526 					 lkb_rsb_lookup);
1527 			list_del(&lkb->lkb_rsb_lookup);
1528 			r->res_first_lkid = lkb->lkb_id;
1529 			_request_lock(r, lkb);
1530 		} else
1531 			r->res_nodeid = -1;
1532 		break;
1533 
1534 	default:
1535 		log_error(r->res_ls, "confirm_master unknown error %d", error);
1536 	}
1537 }
1538 
1539 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1540 			 int namelen, uint32_t parent_lkid, void *ast,
1541 			 void *astarg, void *bast, struct dlm_args *args)
1542 {
1543 	int rv = -EINVAL;
1544 
1545 	/* check for invalid arg usage */
1546 
1547 	if (mode < 0 || mode > DLM_LOCK_EX)
1548 		goto out;
1549 
1550 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1551 		goto out;
1552 
1553 	if (flags & DLM_LKF_CANCEL)
1554 		goto out;
1555 
1556 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1557 		goto out;
1558 
1559 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1560 		goto out;
1561 
1562 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1563 		goto out;
1564 
1565 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1566 		goto out;
1567 
1568 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1569 		goto out;
1570 
1571 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1572 		goto out;
1573 
1574 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1575 		goto out;
1576 
1577 	if (!ast || !lksb)
1578 		goto out;
1579 
1580 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1581 		goto out;
1582 
1583 	/* parent/child locks not yet supported */
1584 	if (parent_lkid)
1585 		goto out;
1586 
1587 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1588 		goto out;
1589 
1590 	/* these args will be copied to the lkb in validate_lock_args,
1591 	   it cannot be done now because when converting locks, fields in
1592 	   an active lkb cannot be modified before locking the rsb */
1593 
1594 	args->flags = flags;
1595 	args->astaddr = ast;
1596 	args->astparam = (long) astarg;
1597 	args->bastaddr = bast;
1598 	args->mode = mode;
1599 	args->lksb = lksb;
1600 	rv = 0;
1601  out:
1602 	return rv;
1603 }
1604 
1605 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1606 {
1607 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1608  		      DLM_LKF_FORCEUNLOCK))
1609 		return -EINVAL;
1610 
1611 	args->flags = flags;
1612 	args->astparam = (long) astarg;
1613 	return 0;
1614 }
1615 
1616 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1617 			      struct dlm_args *args)
1618 {
1619 	int rv = -EINVAL;
1620 
1621 	if (args->flags & DLM_LKF_CONVERT) {
1622 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1623 			goto out;
1624 
1625 		if (args->flags & DLM_LKF_QUECVT &&
1626 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1627 			goto out;
1628 
1629 		rv = -EBUSY;
1630 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1631 			goto out;
1632 
1633 		if (lkb->lkb_wait_type)
1634 			goto out;
1635 	}
1636 
1637 	lkb->lkb_exflags = args->flags;
1638 	lkb->lkb_sbflags = 0;
1639 	lkb->lkb_astaddr = args->astaddr;
1640 	lkb->lkb_astparam = args->astparam;
1641 	lkb->lkb_bastaddr = args->bastaddr;
1642 	lkb->lkb_rqmode = args->mode;
1643 	lkb->lkb_lksb = args->lksb;
1644 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1645 	lkb->lkb_ownpid = (int) current->pid;
1646 	rv = 0;
1647  out:
1648 	return rv;
1649 }
1650 
1651 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1652 {
1653 	int rv = -EINVAL;
1654 
1655 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1656 		goto out;
1657 
1658 	if (args->flags & DLM_LKF_FORCEUNLOCK)
1659 		goto out_ok;
1660 
1661 	if (args->flags & DLM_LKF_CANCEL &&
1662 	    lkb->lkb_status == DLM_LKSTS_GRANTED)
1663 		goto out;
1664 
1665 	if (!(args->flags & DLM_LKF_CANCEL) &&
1666 	    lkb->lkb_status != DLM_LKSTS_GRANTED)
1667 		goto out;
1668 
1669 	rv = -EBUSY;
1670 	if (lkb->lkb_wait_type)
1671 		goto out;
1672 
1673  out_ok:
1674 	lkb->lkb_exflags = args->flags;
1675 	lkb->lkb_sbflags = 0;
1676 	lkb->lkb_astparam = args->astparam;
1677 
1678 	rv = 0;
1679  out:
1680 	return rv;
1681 }
1682 
1683 /*
1684  * Four stage 4 varieties:
1685  * do_request(), do_convert(), do_unlock(), do_cancel()
1686  * These are called on the master node for the given lock and
1687  * from the central locking logic.
1688  */
1689 
1690 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1691 {
1692 	int error = 0;
1693 
1694 	if (can_be_granted(r, lkb, 1)) {
1695 		grant_lock(r, lkb);
1696 		queue_cast(r, lkb, 0);
1697 		goto out;
1698 	}
1699 
1700 	if (can_be_queued(lkb)) {
1701 		error = -EINPROGRESS;
1702 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
1703 		send_blocking_asts(r, lkb);
1704 		goto out;
1705 	}
1706 
1707 	error = -EAGAIN;
1708 	if (force_blocking_asts(lkb))
1709 		send_blocking_asts_all(r, lkb);
1710 	queue_cast(r, lkb, -EAGAIN);
1711 
1712  out:
1713 	return error;
1714 }
1715 
1716 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1717 {
1718 	int error = 0;
1719 
1720 	/* changing an existing lock may allow others to be granted */
1721 
1722 	if (can_be_granted(r, lkb, 1)) {
1723 		grant_lock(r, lkb);
1724 		queue_cast(r, lkb, 0);
1725 		grant_pending_locks(r);
1726 		goto out;
1727 	}
1728 
1729 	if (can_be_queued(lkb)) {
1730 		if (is_demoted(lkb))
1731 			grant_pending_locks(r);
1732 		error = -EINPROGRESS;
1733 		del_lkb(r, lkb);
1734 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1735 		send_blocking_asts(r, lkb);
1736 		goto out;
1737 	}
1738 
1739 	error = -EAGAIN;
1740 	if (force_blocking_asts(lkb))
1741 		send_blocking_asts_all(r, lkb);
1742 	queue_cast(r, lkb, -EAGAIN);
1743 
1744  out:
1745 	return error;
1746 }
1747 
1748 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1749 {
1750 	remove_lock(r, lkb);
1751 	queue_cast(r, lkb, -DLM_EUNLOCK);
1752 	grant_pending_locks(r);
1753 	return -DLM_EUNLOCK;
1754 }
1755 
1756 /* FIXME: if revert_lock() finds that the lkb is granted, we should
1757    skip the queue_cast(ECANCEL).  It indicates that the request/convert
1758    completed (and queued a normal ast) just before the cancel; we don't
1759    want to clobber the sb_result for the normal ast with ECANCEL. */
1760 
1761 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1762 {
1763 	revert_lock(r, lkb);
1764 	queue_cast(r, lkb, -DLM_ECANCEL);
1765 	grant_pending_locks(r);
1766 	return -DLM_ECANCEL;
1767 }
1768 
1769 /*
1770  * Four stage 3 varieties:
1771  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
1772  */
1773 
1774 /* add a new lkb to a possibly new rsb, called by requesting process */
1775 
1776 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777 {
1778 	int error;
1779 
1780 	/* set_master: sets lkb nodeid from r */
1781 
1782 	error = set_master(r, lkb);
1783 	if (error < 0)
1784 		goto out;
1785 	if (error) {
1786 		error = 0;
1787 		goto out;
1788 	}
1789 
1790 	if (is_remote(r))
1791 		/* receive_request() calls do_request() on remote node */
1792 		error = send_request(r, lkb);
1793 	else
1794 		error = do_request(r, lkb);
1795  out:
1796 	return error;
1797 }
1798 
1799 /* change some property of an existing lkb, e.g. mode */
1800 
1801 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1802 {
1803 	int error;
1804 
1805 	if (is_remote(r))
1806 		/* receive_convert() calls do_convert() on remote node */
1807 		error = send_convert(r, lkb);
1808 	else
1809 		error = do_convert(r, lkb);
1810 
1811 	return error;
1812 }
1813 
1814 /* remove an existing lkb from the granted queue */
1815 
1816 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1817 {
1818 	int error;
1819 
1820 	if (is_remote(r))
1821 		/* receive_unlock() calls do_unlock() on remote node */
1822 		error = send_unlock(r, lkb);
1823 	else
1824 		error = do_unlock(r, lkb);
1825 
1826 	return error;
1827 }
1828 
1829 /* remove an existing lkb from the convert or wait queue */
1830 
1831 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1832 {
1833 	int error;
1834 
1835 	if (is_remote(r))
1836 		/* receive_cancel() calls do_cancel() on remote node */
1837 		error = send_cancel(r, lkb);
1838 	else
1839 		error = do_cancel(r, lkb);
1840 
1841 	return error;
1842 }
1843 
1844 /*
1845  * Four stage 2 varieties:
1846  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
1847  */
1848 
1849 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
1850 			int len, struct dlm_args *args)
1851 {
1852 	struct dlm_rsb *r;
1853 	int error;
1854 
1855 	error = validate_lock_args(ls, lkb, args);
1856 	if (error)
1857 		goto out;
1858 
1859 	error = find_rsb(ls, name, len, R_CREATE, &r);
1860 	if (error)
1861 		goto out;
1862 
1863 	lock_rsb(r);
1864 
1865 	attach_lkb(r, lkb);
1866 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
1867 
1868 	error = _request_lock(r, lkb);
1869 
1870 	unlock_rsb(r);
1871 	put_rsb(r);
1872 
1873  out:
1874 	return error;
1875 }
1876 
1877 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1878 			struct dlm_args *args)
1879 {
1880 	struct dlm_rsb *r;
1881 	int error;
1882 
1883 	r = lkb->lkb_resource;
1884 
1885 	hold_rsb(r);
1886 	lock_rsb(r);
1887 
1888 	error = validate_lock_args(ls, lkb, args);
1889 	if (error)
1890 		goto out;
1891 
1892 	error = _convert_lock(r, lkb);
1893  out:
1894 	unlock_rsb(r);
1895 	put_rsb(r);
1896 	return error;
1897 }
1898 
1899 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1900 		       struct dlm_args *args)
1901 {
1902 	struct dlm_rsb *r;
1903 	int error;
1904 
1905 	r = lkb->lkb_resource;
1906 
1907 	hold_rsb(r);
1908 	lock_rsb(r);
1909 
1910 	error = validate_unlock_args(lkb, args);
1911 	if (error)
1912 		goto out;
1913 
1914 	error = _unlock_lock(r, lkb);
1915  out:
1916 	unlock_rsb(r);
1917 	put_rsb(r);
1918 	return error;
1919 }
1920 
1921 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
1922 		       struct dlm_args *args)
1923 {
1924 	struct dlm_rsb *r;
1925 	int error;
1926 
1927 	r = lkb->lkb_resource;
1928 
1929 	hold_rsb(r);
1930 	lock_rsb(r);
1931 
1932 	error = validate_unlock_args(lkb, args);
1933 	if (error)
1934 		goto out;
1935 
1936 	error = _cancel_lock(r, lkb);
1937  out:
1938 	unlock_rsb(r);
1939 	put_rsb(r);
1940 	return error;
1941 }
1942 
1943 /*
1944  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
1945  */
1946 
1947 int dlm_lock(dlm_lockspace_t *lockspace,
1948 	     int mode,
1949 	     struct dlm_lksb *lksb,
1950 	     uint32_t flags,
1951 	     void *name,
1952 	     unsigned int namelen,
1953 	     uint32_t parent_lkid,
1954 	     void (*ast) (void *astarg),
1955 	     void *astarg,
1956 	     void (*bast) (void *astarg, int mode))
1957 {
1958 	struct dlm_ls *ls;
1959 	struct dlm_lkb *lkb;
1960 	struct dlm_args args;
1961 	int error, convert = flags & DLM_LKF_CONVERT;
1962 
1963 	ls = dlm_find_lockspace_local(lockspace);
1964 	if (!ls)
1965 		return -EINVAL;
1966 
1967 	lock_recovery(ls);
1968 
1969 	if (convert)
1970 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
1971 	else
1972 		error = create_lkb(ls, &lkb);
1973 
1974 	if (error)
1975 		goto out;
1976 
1977 	error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
1978 			      astarg, bast, &args);
1979 	if (error)
1980 		goto out_put;
1981 
1982 	if (convert)
1983 		error = convert_lock(ls, lkb, &args);
1984 	else
1985 		error = request_lock(ls, lkb, name, namelen, &args);
1986 
1987 	if (error == -EINPROGRESS)
1988 		error = 0;
1989  out_put:
1990 	if (convert || error)
1991 		__put_lkb(ls, lkb);
1992 	if (error == -EAGAIN)
1993 		error = 0;
1994  out:
1995 	unlock_recovery(ls);
1996 	dlm_put_lockspace(ls);
1997 	return error;
1998 }
1999 
2000 int dlm_unlock(dlm_lockspace_t *lockspace,
2001 	       uint32_t lkid,
2002 	       uint32_t flags,
2003 	       struct dlm_lksb *lksb,
2004 	       void *astarg)
2005 {
2006 	struct dlm_ls *ls;
2007 	struct dlm_lkb *lkb;
2008 	struct dlm_args args;
2009 	int error;
2010 
2011 	ls = dlm_find_lockspace_local(lockspace);
2012 	if (!ls)
2013 		return -EINVAL;
2014 
2015 	lock_recovery(ls);
2016 
2017 	error = find_lkb(ls, lkid, &lkb);
2018 	if (error)
2019 		goto out;
2020 
2021 	error = set_unlock_args(flags, astarg, &args);
2022 	if (error)
2023 		goto out_put;
2024 
2025 	if (flags & DLM_LKF_CANCEL)
2026 		error = cancel_lock(ls, lkb, &args);
2027 	else
2028 		error = unlock_lock(ls, lkb, &args);
2029 
2030 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2031 		error = 0;
2032  out_put:
2033 	dlm_put_lkb(lkb);
2034  out:
2035 	unlock_recovery(ls);
2036 	dlm_put_lockspace(ls);
2037 	return error;
2038 }
2039 
2040 /*
2041  * send/receive routines for remote operations and replies
2042  *
2043  * send_args
2044  * send_common
2045  * send_request			receive_request
2046  * send_convert			receive_convert
2047  * send_unlock			receive_unlock
2048  * send_cancel			receive_cancel
2049  * send_grant			receive_grant
2050  * send_bast			receive_bast
2051  * send_lookup			receive_lookup
2052  * send_remove			receive_remove
2053  *
2054  * 				send_common_reply
2055  * receive_request_reply	send_request_reply
2056  * receive_convert_reply	send_convert_reply
2057  * receive_unlock_reply		send_unlock_reply
2058  * receive_cancel_reply		send_cancel_reply
2059  * receive_lookup_reply		send_lookup_reply
2060  */
2061 
2062 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2063 			  int to_nodeid, int mstype,
2064 			  struct dlm_message **ms_ret,
2065 			  struct dlm_mhandle **mh_ret)
2066 {
2067 	struct dlm_message *ms;
2068 	struct dlm_mhandle *mh;
2069 	char *mb;
2070 	int mb_len = sizeof(struct dlm_message);
2071 
2072 	switch (mstype) {
2073 	case DLM_MSG_REQUEST:
2074 	case DLM_MSG_LOOKUP:
2075 	case DLM_MSG_REMOVE:
2076 		mb_len += r->res_length;
2077 		break;
2078 	case DLM_MSG_CONVERT:
2079 	case DLM_MSG_UNLOCK:
2080 	case DLM_MSG_REQUEST_REPLY:
2081 	case DLM_MSG_CONVERT_REPLY:
2082 	case DLM_MSG_GRANT:
2083 		if (lkb && lkb->lkb_lvbptr)
2084 			mb_len += r->res_ls->ls_lvblen;
2085 		break;
2086 	}
2087 
2088 	/* get_buffer gives us a message handle (mh) that we need to
2089 	   pass into lowcomms_commit and a message buffer (mb) that we
2090 	   write our data into */
2091 
2092 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2093 	if (!mh)
2094 		return -ENOBUFS;
2095 
2096 	memset(mb, 0, mb_len);
2097 
2098 	ms = (struct dlm_message *) mb;
2099 
2100 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2101 	ms->m_header.h_lockspace = r->res_ls->ls_global_id;
2102 	ms->m_header.h_nodeid = dlm_our_nodeid();
2103 	ms->m_header.h_length = mb_len;
2104 	ms->m_header.h_cmd = DLM_MSG;
2105 
2106 	ms->m_type = mstype;
2107 
2108 	*mh_ret = mh;
2109 	*ms_ret = ms;
2110 	return 0;
2111 }
2112 
2113 /* further lowcomms enhancements or alternate implementations may make
2114    the return value from this function useful at some point */
2115 
2116 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2117 {
2118 	dlm_message_out(ms);
2119 	dlm_lowcomms_commit_buffer(mh);
2120 	return 0;
2121 }
2122 
2123 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2124 		      struct dlm_message *ms)
2125 {
2126 	ms->m_nodeid   = lkb->lkb_nodeid;
2127 	ms->m_pid      = lkb->lkb_ownpid;
2128 	ms->m_lkid     = lkb->lkb_id;
2129 	ms->m_remid    = lkb->lkb_remid;
2130 	ms->m_exflags  = lkb->lkb_exflags;
2131 	ms->m_sbflags  = lkb->lkb_sbflags;
2132 	ms->m_flags    = lkb->lkb_flags;
2133 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2134 	ms->m_status   = lkb->lkb_status;
2135 	ms->m_grmode   = lkb->lkb_grmode;
2136 	ms->m_rqmode   = lkb->lkb_rqmode;
2137 	ms->m_hash     = r->res_hash;
2138 
2139 	/* m_result and m_bastmode are set from function args,
2140 	   not from lkb fields */
2141 
2142 	if (lkb->lkb_bastaddr)
2143 		ms->m_asts |= AST_BAST;
2144 	if (lkb->lkb_astaddr)
2145 		ms->m_asts |= AST_COMP;
2146 
2147 	if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
2148 		memcpy(ms->m_extra, r->res_name, r->res_length);
2149 
2150 	else if (lkb->lkb_lvbptr)
2151 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2152 
2153 }
2154 
2155 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2156 {
2157 	struct dlm_message *ms;
2158 	struct dlm_mhandle *mh;
2159 	int to_nodeid, error;
2160 
2161 	add_to_waiters(lkb, mstype);
2162 
2163 	to_nodeid = r->res_nodeid;
2164 
2165 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2166 	if (error)
2167 		goto fail;
2168 
2169 	send_args(r, lkb, ms);
2170 
2171 	error = send_message(mh, ms);
2172 	if (error)
2173 		goto fail;
2174 	return 0;
2175 
2176  fail:
2177 	remove_from_waiters(lkb);
2178 	return error;
2179 }
2180 
2181 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2182 {
2183 	return send_common(r, lkb, DLM_MSG_REQUEST);
2184 }
2185 
2186 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2187 {
2188 	int error;
2189 
2190 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2191 
2192 	/* down conversions go without a reply from the master */
2193 	if (!error && down_conversion(lkb)) {
2194 		remove_from_waiters(lkb);
2195 		r->res_ls->ls_stub_ms.m_result = 0;
2196 		r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
2197 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2198 	}
2199 
2200 	return error;
2201 }
2202 
2203 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2204    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2205    that the master is still correct. */
2206 
2207 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2208 {
2209 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2210 }
2211 
2212 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2213 {
2214 	return send_common(r, lkb, DLM_MSG_CANCEL);
2215 }
2216 
2217 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2218 {
2219 	struct dlm_message *ms;
2220 	struct dlm_mhandle *mh;
2221 	int to_nodeid, error;
2222 
2223 	to_nodeid = lkb->lkb_nodeid;
2224 
2225 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2226 	if (error)
2227 		goto out;
2228 
2229 	send_args(r, lkb, ms);
2230 
2231 	ms->m_result = 0;
2232 
2233 	error = send_message(mh, ms);
2234  out:
2235 	return error;
2236 }
2237 
2238 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2239 {
2240 	struct dlm_message *ms;
2241 	struct dlm_mhandle *mh;
2242 	int to_nodeid, error;
2243 
2244 	to_nodeid = lkb->lkb_nodeid;
2245 
2246 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2247 	if (error)
2248 		goto out;
2249 
2250 	send_args(r, lkb, ms);
2251 
2252 	ms->m_bastmode = mode;
2253 
2254 	error = send_message(mh, ms);
2255  out:
2256 	return error;
2257 }
2258 
2259 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2260 {
2261 	struct dlm_message *ms;
2262 	struct dlm_mhandle *mh;
2263 	int to_nodeid, error;
2264 
2265 	add_to_waiters(lkb, DLM_MSG_LOOKUP);
2266 
2267 	to_nodeid = dlm_dir_nodeid(r);
2268 
2269 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2270 	if (error)
2271 		goto fail;
2272 
2273 	send_args(r, lkb, ms);
2274 
2275 	error = send_message(mh, ms);
2276 	if (error)
2277 		goto fail;
2278 	return 0;
2279 
2280  fail:
2281 	remove_from_waiters(lkb);
2282 	return error;
2283 }
2284 
2285 static int send_remove(struct dlm_rsb *r)
2286 {
2287 	struct dlm_message *ms;
2288 	struct dlm_mhandle *mh;
2289 	int to_nodeid, error;
2290 
2291 	to_nodeid = dlm_dir_nodeid(r);
2292 
2293 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2294 	if (error)
2295 		goto out;
2296 
2297 	memcpy(ms->m_extra, r->res_name, r->res_length);
2298 	ms->m_hash = r->res_hash;
2299 
2300 	error = send_message(mh, ms);
2301  out:
2302 	return error;
2303 }
2304 
2305 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2306 			     int mstype, int rv)
2307 {
2308 	struct dlm_message *ms;
2309 	struct dlm_mhandle *mh;
2310 	int to_nodeid, error;
2311 
2312 	to_nodeid = lkb->lkb_nodeid;
2313 
2314 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2315 	if (error)
2316 		goto out;
2317 
2318 	send_args(r, lkb, ms);
2319 
2320 	ms->m_result = rv;
2321 
2322 	error = send_message(mh, ms);
2323  out:
2324 	return error;
2325 }
2326 
2327 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2328 {
2329 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2330 }
2331 
2332 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2333 {
2334 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2335 }
2336 
2337 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2338 {
2339 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2340 }
2341 
2342 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2343 {
2344 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2345 }
2346 
2347 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2348 			     int ret_nodeid, int rv)
2349 {
2350 	struct dlm_rsb *r = &ls->ls_stub_rsb;
2351 	struct dlm_message *ms;
2352 	struct dlm_mhandle *mh;
2353 	int error, nodeid = ms_in->m_header.h_nodeid;
2354 
2355 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2356 	if (error)
2357 		goto out;
2358 
2359 	ms->m_lkid = ms_in->m_lkid;
2360 	ms->m_result = rv;
2361 	ms->m_nodeid = ret_nodeid;
2362 
2363 	error = send_message(mh, ms);
2364  out:
2365 	return error;
2366 }
2367 
2368 /* which args we save from a received message depends heavily on the type
2369    of message, unlike the send side where we can safely send everything about
2370    the lkb for any type of message */
2371 
2372 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2373 {
2374 	lkb->lkb_exflags = ms->m_exflags;
2375 	lkb->lkb_sbflags = ms->m_sbflags;
2376 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2377 		         (ms->m_flags & 0x0000FFFF);
2378 }
2379 
2380 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2381 {
2382 	lkb->lkb_sbflags = ms->m_sbflags;
2383 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2384 		         (ms->m_flags & 0x0000FFFF);
2385 }
2386 
2387 static int receive_extralen(struct dlm_message *ms)
2388 {
2389 	return (ms->m_header.h_length - sizeof(struct dlm_message));
2390 }
2391 
2392 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2393 		       struct dlm_message *ms)
2394 {
2395 	int len;
2396 
2397 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2398 		if (!lkb->lkb_lvbptr)
2399 			lkb->lkb_lvbptr = allocate_lvb(ls);
2400 		if (!lkb->lkb_lvbptr)
2401 			return -ENOMEM;
2402 		len = receive_extralen(ms);
2403 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2404 	}
2405 	return 0;
2406 }
2407 
2408 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2409 				struct dlm_message *ms)
2410 {
2411 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2412 	lkb->lkb_ownpid = ms->m_pid;
2413 	lkb->lkb_remid = ms->m_lkid;
2414 	lkb->lkb_grmode = DLM_LOCK_IV;
2415 	lkb->lkb_rqmode = ms->m_rqmode;
2416 	lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2417 	lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2418 
2419 	DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2420 
2421 	if (receive_lvb(ls, lkb, ms))
2422 		return -ENOMEM;
2423 
2424 	return 0;
2425 }
2426 
2427 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2428 				struct dlm_message *ms)
2429 {
2430 	if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2431 		log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2432 			  lkb->lkb_nodeid, ms->m_header.h_nodeid,
2433 			  lkb->lkb_id, lkb->lkb_remid);
2434 		return -EINVAL;
2435 	}
2436 
2437 	if (!is_master_copy(lkb))
2438 		return -EINVAL;
2439 
2440 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2441 		return -EBUSY;
2442 
2443 	if (receive_lvb(ls, lkb, ms))
2444 		return -ENOMEM;
2445 
2446 	lkb->lkb_rqmode = ms->m_rqmode;
2447 	lkb->lkb_lvbseq = ms->m_lvbseq;
2448 
2449 	return 0;
2450 }
2451 
2452 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2453 			       struct dlm_message *ms)
2454 {
2455 	if (!is_master_copy(lkb))
2456 		return -EINVAL;
2457 	if (receive_lvb(ls, lkb, ms))
2458 		return -ENOMEM;
2459 	return 0;
2460 }
2461 
2462 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2463    uses to send a reply and that the remote end uses to process the reply. */
2464 
2465 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2466 {
2467 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2468 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
2469 	lkb->lkb_remid = ms->m_lkid;
2470 }
2471 
2472 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2473 {
2474 	struct dlm_lkb *lkb;
2475 	struct dlm_rsb *r;
2476 	int error, namelen;
2477 
2478 	error = create_lkb(ls, &lkb);
2479 	if (error)
2480 		goto fail;
2481 
2482 	receive_flags(lkb, ms);
2483 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
2484 	error = receive_request_args(ls, lkb, ms);
2485 	if (error) {
2486 		__put_lkb(ls, lkb);
2487 		goto fail;
2488 	}
2489 
2490 	namelen = receive_extralen(ms);
2491 
2492 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2493 	if (error) {
2494 		__put_lkb(ls, lkb);
2495 		goto fail;
2496 	}
2497 
2498 	lock_rsb(r);
2499 
2500 	attach_lkb(r, lkb);
2501 	error = do_request(r, lkb);
2502 	send_request_reply(r, lkb, error);
2503 
2504 	unlock_rsb(r);
2505 	put_rsb(r);
2506 
2507 	if (error == -EINPROGRESS)
2508 		error = 0;
2509 	if (error)
2510 		dlm_put_lkb(lkb);
2511 	return;
2512 
2513  fail:
2514 	setup_stub_lkb(ls, ms);
2515 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2516 }
2517 
2518 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2519 {
2520 	struct dlm_lkb *lkb;
2521 	struct dlm_rsb *r;
2522 	int error, reply = 1;
2523 
2524 	error = find_lkb(ls, ms->m_remid, &lkb);
2525 	if (error)
2526 		goto fail;
2527 
2528 	r = lkb->lkb_resource;
2529 
2530 	hold_rsb(r);
2531 	lock_rsb(r);
2532 
2533 	receive_flags(lkb, ms);
2534 	error = receive_convert_args(ls, lkb, ms);
2535 	if (error)
2536 		goto out;
2537 	reply = !down_conversion(lkb);
2538 
2539 	error = do_convert(r, lkb);
2540  out:
2541 	if (reply)
2542 		send_convert_reply(r, lkb, error);
2543 
2544 	unlock_rsb(r);
2545 	put_rsb(r);
2546 	dlm_put_lkb(lkb);
2547 	return;
2548 
2549  fail:
2550 	setup_stub_lkb(ls, ms);
2551 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2552 }
2553 
2554 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2555 {
2556 	struct dlm_lkb *lkb;
2557 	struct dlm_rsb *r;
2558 	int error;
2559 
2560 	error = find_lkb(ls, ms->m_remid, &lkb);
2561 	if (error)
2562 		goto fail;
2563 
2564 	r = lkb->lkb_resource;
2565 
2566 	hold_rsb(r);
2567 	lock_rsb(r);
2568 
2569 	receive_flags(lkb, ms);
2570 	error = receive_unlock_args(ls, lkb, ms);
2571 	if (error)
2572 		goto out;
2573 
2574 	error = do_unlock(r, lkb);
2575  out:
2576 	send_unlock_reply(r, lkb, error);
2577 
2578 	unlock_rsb(r);
2579 	put_rsb(r);
2580 	dlm_put_lkb(lkb);
2581 	return;
2582 
2583  fail:
2584 	setup_stub_lkb(ls, ms);
2585 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2586 }
2587 
2588 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2589 {
2590 	struct dlm_lkb *lkb;
2591 	struct dlm_rsb *r;
2592 	int error;
2593 
2594 	error = find_lkb(ls, ms->m_remid, &lkb);
2595 	if (error)
2596 		goto fail;
2597 
2598 	receive_flags(lkb, ms);
2599 
2600 	r = lkb->lkb_resource;
2601 
2602 	hold_rsb(r);
2603 	lock_rsb(r);
2604 
2605 	error = do_cancel(r, lkb);
2606 	send_cancel_reply(r, lkb, error);
2607 
2608 	unlock_rsb(r);
2609 	put_rsb(r);
2610 	dlm_put_lkb(lkb);
2611 	return;
2612 
2613  fail:
2614 	setup_stub_lkb(ls, ms);
2615 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2616 }
2617 
2618 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2619 {
2620 	struct dlm_lkb *lkb;
2621 	struct dlm_rsb *r;
2622 	int error;
2623 
2624 	error = find_lkb(ls, ms->m_remid, &lkb);
2625 	if (error) {
2626 		log_error(ls, "receive_grant no lkb");
2627 		return;
2628 	}
2629 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2630 
2631 	r = lkb->lkb_resource;
2632 
2633 	hold_rsb(r);
2634 	lock_rsb(r);
2635 
2636 	receive_flags_reply(lkb, ms);
2637 	grant_lock_pc(r, lkb, ms);
2638 	queue_cast(r, lkb, 0);
2639 
2640 	unlock_rsb(r);
2641 	put_rsb(r);
2642 	dlm_put_lkb(lkb);
2643 }
2644 
2645 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2646 {
2647 	struct dlm_lkb *lkb;
2648 	struct dlm_rsb *r;
2649 	int error;
2650 
2651 	error = find_lkb(ls, ms->m_remid, &lkb);
2652 	if (error) {
2653 		log_error(ls, "receive_bast no lkb");
2654 		return;
2655 	}
2656 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2657 
2658 	r = lkb->lkb_resource;
2659 
2660 	hold_rsb(r);
2661 	lock_rsb(r);
2662 
2663 	queue_bast(r, lkb, ms->m_bastmode);
2664 
2665 	unlock_rsb(r);
2666 	put_rsb(r);
2667 	dlm_put_lkb(lkb);
2668 }
2669 
2670 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2671 {
2672 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2673 
2674 	from_nodeid = ms->m_header.h_nodeid;
2675 	our_nodeid = dlm_our_nodeid();
2676 
2677 	len = receive_extralen(ms);
2678 
2679 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2680 	if (dir_nodeid != our_nodeid) {
2681 		log_error(ls, "lookup dir_nodeid %d from %d",
2682 			  dir_nodeid, from_nodeid);
2683 		error = -EINVAL;
2684 		ret_nodeid = -1;
2685 		goto out;
2686 	}
2687 
2688 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2689 
2690 	/* Optimization: we're master so treat lookup as a request */
2691 	if (!error && ret_nodeid == our_nodeid) {
2692 		receive_request(ls, ms);
2693 		return;
2694 	}
2695  out:
2696 	send_lookup_reply(ls, ms, ret_nodeid, error);
2697 }
2698 
2699 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2700 {
2701 	int len, dir_nodeid, from_nodeid;
2702 
2703 	from_nodeid = ms->m_header.h_nodeid;
2704 
2705 	len = receive_extralen(ms);
2706 
2707 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2708 	if (dir_nodeid != dlm_our_nodeid()) {
2709 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
2710 			  dir_nodeid, from_nodeid);
2711 		return;
2712 	}
2713 
2714 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2715 }
2716 
2717 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2718 {
2719 	struct dlm_lkb *lkb;
2720 	struct dlm_rsb *r;
2721 	int error, mstype;
2722 
2723 	error = find_lkb(ls, ms->m_remid, &lkb);
2724 	if (error) {
2725 		log_error(ls, "receive_request_reply no lkb");
2726 		return;
2727 	}
2728 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2729 
2730 	mstype = lkb->lkb_wait_type;
2731 	error = remove_from_waiters(lkb);
2732 	if (error) {
2733 		log_error(ls, "receive_request_reply not on waiters");
2734 		goto out;
2735 	}
2736 
2737 	/* this is the value returned from do_request() on the master */
2738 	error = ms->m_result;
2739 
2740 	r = lkb->lkb_resource;
2741 	hold_rsb(r);
2742 	lock_rsb(r);
2743 
2744 	/* Optimization: the dir node was also the master, so it took our
2745 	   lookup as a request and sent request reply instead of lookup reply */
2746 	if (mstype == DLM_MSG_LOOKUP) {
2747 		r->res_nodeid = ms->m_header.h_nodeid;
2748 		lkb->lkb_nodeid = r->res_nodeid;
2749 	}
2750 
2751 	switch (error) {
2752 	case -EAGAIN:
2753 		/* request would block (be queued) on remote master;
2754 		   the unhold undoes the original ref from create_lkb()
2755 		   so it leads to the lkb being freed */
2756 		queue_cast(r, lkb, -EAGAIN);
2757 		confirm_master(r, -EAGAIN);
2758 		unhold_lkb(lkb);
2759 		break;
2760 
2761 	case -EINPROGRESS:
2762 	case 0:
2763 		/* request was queued or granted on remote master */
2764 		receive_flags_reply(lkb, ms);
2765 		lkb->lkb_remid = ms->m_lkid;
2766 		if (error)
2767 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
2768 		else {
2769 			grant_lock_pc(r, lkb, ms);
2770 			queue_cast(r, lkb, 0);
2771 		}
2772 		confirm_master(r, error);
2773 		break;
2774 
2775 	case -EBADR:
2776 	case -ENOTBLK:
2777 		/* find_rsb failed to find rsb or rsb wasn't master */
2778 		r->res_nodeid = -1;
2779 		lkb->lkb_nodeid = -1;
2780 		_request_lock(r, lkb);
2781 		break;
2782 
2783 	default:
2784 		log_error(ls, "receive_request_reply error %d", error);
2785 	}
2786 
2787 	unlock_rsb(r);
2788 	put_rsb(r);
2789  out:
2790 	dlm_put_lkb(lkb);
2791 }
2792 
2793 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2794 				    struct dlm_message *ms)
2795 {
2796 	int error = ms->m_result;
2797 
2798 	/* this is the value returned from do_convert() on the master */
2799 
2800 	switch (error) {
2801 	case -EAGAIN:
2802 		/* convert would block (be queued) on remote master */
2803 		queue_cast(r, lkb, -EAGAIN);
2804 		break;
2805 
2806 	case -EINPROGRESS:
2807 		/* convert was queued on remote master */
2808 		del_lkb(r, lkb);
2809 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2810 		break;
2811 
2812 	case 0:
2813 		/* convert was granted on remote master */
2814 		receive_flags_reply(lkb, ms);
2815 		grant_lock_pc(r, lkb, ms);
2816 		queue_cast(r, lkb, 0);
2817 		break;
2818 
2819 	default:
2820 		log_error(r->res_ls, "receive_convert_reply error %d", error);
2821 	}
2822 }
2823 
2824 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2825 {
2826 	struct dlm_rsb *r = lkb->lkb_resource;
2827 
2828 	hold_rsb(r);
2829 	lock_rsb(r);
2830 
2831 	__receive_convert_reply(r, lkb, ms);
2832 
2833 	unlock_rsb(r);
2834 	put_rsb(r);
2835 }
2836 
2837 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
2838 {
2839 	struct dlm_lkb *lkb;
2840 	int error;
2841 
2842 	error = find_lkb(ls, ms->m_remid, &lkb);
2843 	if (error) {
2844 		log_error(ls, "receive_convert_reply no lkb");
2845 		return;
2846 	}
2847 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2848 
2849 	error = remove_from_waiters(lkb);
2850 	if (error) {
2851 		log_error(ls, "receive_convert_reply not on waiters");
2852 		goto out;
2853 	}
2854 
2855 	_receive_convert_reply(lkb, ms);
2856  out:
2857 	dlm_put_lkb(lkb);
2858 }
2859 
2860 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2861 {
2862 	struct dlm_rsb *r = lkb->lkb_resource;
2863 	int error = ms->m_result;
2864 
2865 	hold_rsb(r);
2866 	lock_rsb(r);
2867 
2868 	/* this is the value returned from do_unlock() on the master */
2869 
2870 	switch (error) {
2871 	case -DLM_EUNLOCK:
2872 		receive_flags_reply(lkb, ms);
2873 		remove_lock_pc(r, lkb);
2874 		queue_cast(r, lkb, -DLM_EUNLOCK);
2875 		break;
2876 	default:
2877 		log_error(r->res_ls, "receive_unlock_reply error %d", error);
2878 	}
2879 
2880 	unlock_rsb(r);
2881 	put_rsb(r);
2882 }
2883 
2884 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
2885 {
2886 	struct dlm_lkb *lkb;
2887 	int error;
2888 
2889 	error = find_lkb(ls, ms->m_remid, &lkb);
2890 	if (error) {
2891 		log_error(ls, "receive_unlock_reply no lkb");
2892 		return;
2893 	}
2894 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2895 
2896 	error = remove_from_waiters(lkb);
2897 	if (error) {
2898 		log_error(ls, "receive_unlock_reply not on waiters");
2899 		goto out;
2900 	}
2901 
2902 	_receive_unlock_reply(lkb, ms);
2903  out:
2904 	dlm_put_lkb(lkb);
2905 }
2906 
2907 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2908 {
2909 	struct dlm_rsb *r = lkb->lkb_resource;
2910 	int error = ms->m_result;
2911 
2912 	hold_rsb(r);
2913 	lock_rsb(r);
2914 
2915 	/* this is the value returned from do_cancel() on the master */
2916 
2917 	switch (error) {
2918 	case -DLM_ECANCEL:
2919 		receive_flags_reply(lkb, ms);
2920 		revert_lock_pc(r, lkb);
2921 		queue_cast(r, lkb, -DLM_ECANCEL);
2922 		break;
2923 	default:
2924 		log_error(r->res_ls, "receive_cancel_reply error %d", error);
2925 	}
2926 
2927 	unlock_rsb(r);
2928 	put_rsb(r);
2929 }
2930 
2931 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
2932 {
2933 	struct dlm_lkb *lkb;
2934 	int error;
2935 
2936 	error = find_lkb(ls, ms->m_remid, &lkb);
2937 	if (error) {
2938 		log_error(ls, "receive_cancel_reply no lkb");
2939 		return;
2940 	}
2941 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2942 
2943 	error = remove_from_waiters(lkb);
2944 	if (error) {
2945 		log_error(ls, "receive_cancel_reply not on waiters");
2946 		goto out;
2947 	}
2948 
2949 	_receive_cancel_reply(lkb, ms);
2950  out:
2951 	dlm_put_lkb(lkb);
2952 }
2953 
2954 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
2955 {
2956 	struct dlm_lkb *lkb;
2957 	struct dlm_rsb *r;
2958 	int error, ret_nodeid;
2959 
2960 	error = find_lkb(ls, ms->m_lkid, &lkb);
2961 	if (error) {
2962 		log_error(ls, "receive_lookup_reply no lkb");
2963 		return;
2964 	}
2965 
2966 	error = remove_from_waiters(lkb);
2967 	if (error) {
2968 		log_error(ls, "receive_lookup_reply not on waiters");
2969 		goto out;
2970 	}
2971 
2972 	/* this is the value returned by dlm_dir_lookup on dir node
2973 	   FIXME: will a non-zero error ever be returned? */
2974 	error = ms->m_result;
2975 
2976 	r = lkb->lkb_resource;
2977 	hold_rsb(r);
2978 	lock_rsb(r);
2979 
2980 	ret_nodeid = ms->m_nodeid;
2981 	if (ret_nodeid == dlm_our_nodeid()) {
2982 		r->res_nodeid = 0;
2983 		ret_nodeid = 0;
2984 		r->res_first_lkid = 0;
2985 	} else {
2986 		/* set_master() will copy res_nodeid to lkb_nodeid */
2987 		r->res_nodeid = ret_nodeid;
2988 	}
2989 
2990 	_request_lock(r, lkb);
2991 
2992 	if (!ret_nodeid)
2993 		process_lookup_list(r);
2994 
2995 	unlock_rsb(r);
2996 	put_rsb(r);
2997  out:
2998 	dlm_put_lkb(lkb);
2999 }
3000 
3001 int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3002 {
3003 	struct dlm_message *ms = (struct dlm_message *) hd;
3004 	struct dlm_ls *ls;
3005 	int error;
3006 
3007 	if (!recovery)
3008 		dlm_message_in(ms);
3009 
3010 	ls = dlm_find_lockspace_global(hd->h_lockspace);
3011 	if (!ls) {
3012 		log_print("drop message %d from %d for unknown lockspace %d",
3013 			  ms->m_type, nodeid, hd->h_lockspace);
3014 		return -EINVAL;
3015 	}
3016 
3017 	/* recovery may have just ended leaving a bunch of backed-up requests
3018 	   in the requestqueue; wait while dlm_recoverd clears them */
3019 
3020 	if (!recovery)
3021 		dlm_wait_requestqueue(ls);
3022 
3023 	/* recovery may have just started while there were a bunch of
3024 	   in-flight requests -- save them in requestqueue to be processed
3025 	   after recovery.  we can't let dlm_recvd block on the recovery
3026 	   lock.  if dlm_recoverd is calling this function to clear the
3027 	   requestqueue, it needs to be interrupted (-EINTR) if another
3028 	   recovery operation is starting. */
3029 
3030 	while (1) {
3031 		if (dlm_locking_stopped(ls)) {
3032 			if (recovery) {
3033 				error = -EINTR;
3034 				goto out;
3035 			}
3036 			error = dlm_add_requestqueue(ls, nodeid, hd);
3037 			if (error == -EAGAIN)
3038 				continue;
3039 			else {
3040 				error = -EINTR;
3041 				goto out;
3042 			}
3043 		}
3044 
3045 		if (lock_recovery_try(ls))
3046 			break;
3047 		schedule();
3048 	}
3049 
3050 	switch (ms->m_type) {
3051 
3052 	/* messages sent to a master node */
3053 
3054 	case DLM_MSG_REQUEST:
3055 		receive_request(ls, ms);
3056 		break;
3057 
3058 	case DLM_MSG_CONVERT:
3059 		receive_convert(ls, ms);
3060 		break;
3061 
3062 	case DLM_MSG_UNLOCK:
3063 		receive_unlock(ls, ms);
3064 		break;
3065 
3066 	case DLM_MSG_CANCEL:
3067 		receive_cancel(ls, ms);
3068 		break;
3069 
3070 	/* messages sent from a master node (replies to above) */
3071 
3072 	case DLM_MSG_REQUEST_REPLY:
3073 		receive_request_reply(ls, ms);
3074 		break;
3075 
3076 	case DLM_MSG_CONVERT_REPLY:
3077 		receive_convert_reply(ls, ms);
3078 		break;
3079 
3080 	case DLM_MSG_UNLOCK_REPLY:
3081 		receive_unlock_reply(ls, ms);
3082 		break;
3083 
3084 	case DLM_MSG_CANCEL_REPLY:
3085 		receive_cancel_reply(ls, ms);
3086 		break;
3087 
3088 	/* messages sent from a master node (only two types of async msg) */
3089 
3090 	case DLM_MSG_GRANT:
3091 		receive_grant(ls, ms);
3092 		break;
3093 
3094 	case DLM_MSG_BAST:
3095 		receive_bast(ls, ms);
3096 		break;
3097 
3098 	/* messages sent to a dir node */
3099 
3100 	case DLM_MSG_LOOKUP:
3101 		receive_lookup(ls, ms);
3102 		break;
3103 
3104 	case DLM_MSG_REMOVE:
3105 		receive_remove(ls, ms);
3106 		break;
3107 
3108 	/* messages sent from a dir node (remove has no reply) */
3109 
3110 	case DLM_MSG_LOOKUP_REPLY:
3111 		receive_lookup_reply(ls, ms);
3112 		break;
3113 
3114 	default:
3115 		log_error(ls, "unknown message type %d", ms->m_type);
3116 	}
3117 
3118 	unlock_recovery(ls);
3119  out:
3120 	dlm_put_lockspace(ls);
3121 	dlm_astd_wake();
3122 	return 0;
3123 }
3124 
3125 
3126 /*
3127  * Recovery related
3128  */
3129 
3130 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3131 {
3132 	if (middle_conversion(lkb)) {
3133 		hold_lkb(lkb);
3134 		ls->ls_stub_ms.m_result = -EINPROGRESS;
3135 		_remove_from_waiters(lkb);
3136 		_receive_convert_reply(lkb, &ls->ls_stub_ms);
3137 
3138 		/* Same special case as in receive_rcom_lock_args() */
3139 		lkb->lkb_grmode = DLM_LOCK_IV;
3140 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3141 		unhold_lkb(lkb);
3142 
3143 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3144 		lkb->lkb_flags |= DLM_IFL_RESEND;
3145 	}
3146 
3147 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3148 	   conversions are async; there's no reply from the remote master */
3149 }
3150 
3151 /* A waiting lkb needs recovery if the master node has failed, or
3152    the master node is changing (only when no directory is used) */
3153 
3154 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3155 {
3156 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
3157 		return 1;
3158 
3159 	if (!dlm_no_directory(ls))
3160 		return 0;
3161 
3162 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3163 		return 1;
3164 
3165 	return 0;
3166 }
3167 
3168 /* Recovery for locks that are waiting for replies from nodes that are now
3169    gone.  We can just complete unlocks and cancels by faking a reply from the
3170    dead node.  Requests and up-conversions we flag to be resent after
3171    recovery.  Down-conversions can just be completed with a fake reply like
3172    unlocks.  Conversions between PR and CW need special attention. */
3173 
3174 void dlm_recover_waiters_pre(struct dlm_ls *ls)
3175 {
3176 	struct dlm_lkb *lkb, *safe;
3177 
3178 	mutex_lock(&ls->ls_waiters_mutex);
3179 
3180 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3181 		log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3182 			  lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3183 
3184 		/* all outstanding lookups, regardless of destination  will be
3185 		   resent after recovery is done */
3186 
3187 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3188 			lkb->lkb_flags |= DLM_IFL_RESEND;
3189 			continue;
3190 		}
3191 
3192 		if (!waiter_needs_recovery(ls, lkb))
3193 			continue;
3194 
3195 		switch (lkb->lkb_wait_type) {
3196 
3197 		case DLM_MSG_REQUEST:
3198 			lkb->lkb_flags |= DLM_IFL_RESEND;
3199 			break;
3200 
3201 		case DLM_MSG_CONVERT:
3202 			recover_convert_waiter(ls, lkb);
3203 			break;
3204 
3205 		case DLM_MSG_UNLOCK:
3206 			hold_lkb(lkb);
3207 			ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
3208 			_remove_from_waiters(lkb);
3209 			_receive_unlock_reply(lkb, &ls->ls_stub_ms);
3210 			dlm_put_lkb(lkb);
3211 			break;
3212 
3213 		case DLM_MSG_CANCEL:
3214 			hold_lkb(lkb);
3215 			ls->ls_stub_ms.m_result = -DLM_ECANCEL;
3216 			_remove_from_waiters(lkb);
3217 			_receive_cancel_reply(lkb, &ls->ls_stub_ms);
3218 			dlm_put_lkb(lkb);
3219 			break;
3220 
3221 		default:
3222 			log_error(ls, "invalid lkb wait_type %d",
3223 				  lkb->lkb_wait_type);
3224 		}
3225 		schedule();
3226 	}
3227 	mutex_unlock(&ls->ls_waiters_mutex);
3228 }
3229 
3230 static int remove_resend_waiter(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
3231 {
3232 	struct dlm_lkb *lkb;
3233 	int rv = 0;
3234 
3235 	mutex_lock(&ls->ls_waiters_mutex);
3236 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3237 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
3238 			rv = lkb->lkb_wait_type;
3239 			_remove_from_waiters(lkb);
3240 			lkb->lkb_flags &= ~DLM_IFL_RESEND;
3241 			break;
3242 		}
3243 	}
3244 	mutex_unlock(&ls->ls_waiters_mutex);
3245 
3246 	if (!rv)
3247 		lkb = NULL;
3248 	*lkb_ret = lkb;
3249 	return rv;
3250 }
3251 
3252 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
3253    master or dir-node for r.  Processing the lkb may result in it being placed
3254    back on waiters. */
3255 
3256 int dlm_recover_waiters_post(struct dlm_ls *ls)
3257 {
3258 	struct dlm_lkb *lkb;
3259 	struct dlm_rsb *r;
3260 	int error = 0, mstype;
3261 
3262 	while (1) {
3263 		if (dlm_locking_stopped(ls)) {
3264 			log_debug(ls, "recover_waiters_post aborted");
3265 			error = -EINTR;
3266 			break;
3267 		}
3268 
3269 		mstype = remove_resend_waiter(ls, &lkb);
3270 		if (!mstype)
3271 			break;
3272 
3273 		r = lkb->lkb_resource;
3274 
3275 		log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3276 			  lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3277 
3278 		switch (mstype) {
3279 
3280 		case DLM_MSG_LOOKUP:
3281 			hold_rsb(r);
3282 			lock_rsb(r);
3283 			_request_lock(r, lkb);
3284 			if (is_master(r))
3285 				confirm_master(r, 0);
3286 			unlock_rsb(r);
3287 			put_rsb(r);
3288 			break;
3289 
3290 		case DLM_MSG_REQUEST:
3291 			hold_rsb(r);
3292 			lock_rsb(r);
3293 			_request_lock(r, lkb);
3294 			if (is_master(r))
3295 				confirm_master(r, 0);
3296 			unlock_rsb(r);
3297 			put_rsb(r);
3298 			break;
3299 
3300 		case DLM_MSG_CONVERT:
3301 			hold_rsb(r);
3302 			lock_rsb(r);
3303 			_convert_lock(r, lkb);
3304 			unlock_rsb(r);
3305 			put_rsb(r);
3306 			break;
3307 
3308 		default:
3309 			log_error(ls, "recover_waiters_post type %d", mstype);
3310 		}
3311 	}
3312 
3313 	return error;
3314 }
3315 
3316 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3317 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3318 {
3319 	struct dlm_ls *ls = r->res_ls;
3320 	struct dlm_lkb *lkb, *safe;
3321 
3322 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3323 		if (test(ls, lkb)) {
3324 			rsb_set_flag(r, RSB_LOCKS_PURGED);
3325 			del_lkb(r, lkb);
3326 			/* this put should free the lkb */
3327 			if (!dlm_put_lkb(lkb))
3328 				log_error(ls, "purged lkb not released");
3329 		}
3330 	}
3331 }
3332 
3333 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3334 {
3335 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3336 }
3337 
3338 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3339 {
3340 	return is_master_copy(lkb);
3341 }
3342 
3343 static void purge_dead_locks(struct dlm_rsb *r)
3344 {
3345 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3346 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3347 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3348 }
3349 
3350 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3351 {
3352 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3353 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3354 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3355 }
3356 
3357 /* Get rid of locks held by nodes that are gone. */
3358 
3359 int dlm_purge_locks(struct dlm_ls *ls)
3360 {
3361 	struct dlm_rsb *r;
3362 
3363 	log_debug(ls, "dlm_purge_locks");
3364 
3365 	down_write(&ls->ls_root_sem);
3366 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3367 		hold_rsb(r);
3368 		lock_rsb(r);
3369 		if (is_master(r))
3370 			purge_dead_locks(r);
3371 		unlock_rsb(r);
3372 		unhold_rsb(r);
3373 
3374 		schedule();
3375 	}
3376 	up_write(&ls->ls_root_sem);
3377 
3378 	return 0;
3379 }
3380 
3381 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3382 {
3383 	struct dlm_rsb *r, *r_ret = NULL;
3384 
3385 	read_lock(&ls->ls_rsbtbl[bucket].lock);
3386 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3387 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
3388 			continue;
3389 		hold_rsb(r);
3390 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
3391 		r_ret = r;
3392 		break;
3393 	}
3394 	read_unlock(&ls->ls_rsbtbl[bucket].lock);
3395 	return r_ret;
3396 }
3397 
3398 void dlm_grant_after_purge(struct dlm_ls *ls)
3399 {
3400 	struct dlm_rsb *r;
3401 	int bucket = 0;
3402 
3403 	while (1) {
3404 		r = find_purged_rsb(ls, bucket);
3405 		if (!r) {
3406 			if (bucket == ls->ls_rsbtbl_size - 1)
3407 				break;
3408 			bucket++;
3409 			continue;
3410 		}
3411 		lock_rsb(r);
3412 		if (is_master(r)) {
3413 			grant_pending_locks(r);
3414 			confirm_master(r, 0);
3415 		}
3416 		unlock_rsb(r);
3417 		put_rsb(r);
3418 		schedule();
3419 	}
3420 }
3421 
3422 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3423 					 uint32_t remid)
3424 {
3425 	struct dlm_lkb *lkb;
3426 
3427 	list_for_each_entry(lkb, head, lkb_statequeue) {
3428 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3429 			return lkb;
3430 	}
3431 	return NULL;
3432 }
3433 
3434 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3435 				    uint32_t remid)
3436 {
3437 	struct dlm_lkb *lkb;
3438 
3439 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3440 	if (lkb)
3441 		return lkb;
3442 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3443 	if (lkb)
3444 		return lkb;
3445 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3446 	if (lkb)
3447 		return lkb;
3448 	return NULL;
3449 }
3450 
3451 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3452 				  struct dlm_rsb *r, struct dlm_rcom *rc)
3453 {
3454 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3455 	int lvblen;
3456 
3457 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3458 	lkb->lkb_ownpid = rl->rl_ownpid;
3459 	lkb->lkb_remid = rl->rl_lkid;
3460 	lkb->lkb_exflags = rl->rl_exflags;
3461 	lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3462 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3463 	lkb->lkb_lvbseq = rl->rl_lvbseq;
3464 	lkb->lkb_rqmode = rl->rl_rqmode;
3465 	lkb->lkb_grmode = rl->rl_grmode;
3466 	/* don't set lkb_status because add_lkb wants to itself */
3467 
3468 	lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3469 	lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3470 
3471 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3472 		lkb->lkb_lvbptr = allocate_lvb(ls);
3473 		if (!lkb->lkb_lvbptr)
3474 			return -ENOMEM;
3475 		lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3476 			 sizeof(struct rcom_lock);
3477 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3478 	}
3479 
3480 	/* Conversions between PR and CW (middle modes) need special handling.
3481 	   The real granted mode of these converting locks cannot be determined
3482 	   until all locks have been rebuilt on the rsb (recover_conversion) */
3483 
3484 	if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3485 		rl->rl_status = DLM_LKSTS_CONVERT;
3486 		lkb->lkb_grmode = DLM_LOCK_IV;
3487 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
3488 	}
3489 
3490 	return 0;
3491 }
3492 
3493 /* This lkb may have been recovered in a previous aborted recovery so we need
3494    to check if the rsb already has an lkb with the given remote nodeid/lkid.
3495    If so we just send back a standard reply.  If not, we create a new lkb with
3496    the given values and send back our lkid.  We send back our lkid by sending
3497    back the rcom_lock struct we got but with the remid field filled in. */
3498 
3499 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3500 {
3501 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3502 	struct dlm_rsb *r;
3503 	struct dlm_lkb *lkb;
3504 	int error;
3505 
3506 	if (rl->rl_parent_lkid) {
3507 		error = -EOPNOTSUPP;
3508 		goto out;
3509 	}
3510 
3511 	error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3512 	if (error)
3513 		goto out;
3514 
3515 	lock_rsb(r);
3516 
3517 	lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3518 	if (lkb) {
3519 		error = -EEXIST;
3520 		goto out_remid;
3521 	}
3522 
3523 	error = create_lkb(ls, &lkb);
3524 	if (error)
3525 		goto out_unlock;
3526 
3527 	error = receive_rcom_lock_args(ls, lkb, r, rc);
3528 	if (error) {
3529 		__put_lkb(ls, lkb);
3530 		goto out_unlock;
3531 	}
3532 
3533 	attach_lkb(r, lkb);
3534 	add_lkb(r, lkb, rl->rl_status);
3535 	error = 0;
3536 
3537  out_remid:
3538 	/* this is the new value returned to the lock holder for
3539 	   saving in its process-copy lkb */
3540 	rl->rl_remid = lkb->lkb_id;
3541 
3542  out_unlock:
3543 	unlock_rsb(r);
3544 	put_rsb(r);
3545  out:
3546 	if (error)
3547 		log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3548 	rl->rl_result = error;
3549 	return error;
3550 }
3551 
3552 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3553 {
3554 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3555 	struct dlm_rsb *r;
3556 	struct dlm_lkb *lkb;
3557 	int error;
3558 
3559 	error = find_lkb(ls, rl->rl_lkid, &lkb);
3560 	if (error) {
3561 		log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3562 		return error;
3563 	}
3564 
3565 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3566 
3567 	error = rl->rl_result;
3568 
3569 	r = lkb->lkb_resource;
3570 	hold_rsb(r);
3571 	lock_rsb(r);
3572 
3573 	switch (error) {
3574 	case -EEXIST:
3575 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
3576 		/* fall through */
3577 	case 0:
3578 		lkb->lkb_remid = rl->rl_remid;
3579 		break;
3580 	default:
3581 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3582 			  error, lkb->lkb_id);
3583 	}
3584 
3585 	/* an ack for dlm_recover_locks() which waits for replies from
3586 	   all the locks it sends to new masters */
3587 	dlm_recovered_lock(r);
3588 
3589 	unlock_rsb(r);
3590 	put_rsb(r);
3591 	dlm_put_lkb(lkb);
3592 
3593 	return 0;
3594 }
3595 
3596 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3597 		     int mode, uint32_t flags, void *name, unsigned int namelen,
3598 		     uint32_t parent_lkid)
3599 {
3600 	struct dlm_lkb *lkb;
3601 	struct dlm_args args;
3602 	int error;
3603 
3604 	lock_recovery(ls);
3605 
3606 	error = create_lkb(ls, &lkb);
3607 	if (error) {
3608 		kfree(ua);
3609 		goto out;
3610 	}
3611 
3612 	if (flags & DLM_LKF_VALBLK) {
3613 		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3614 		if (!ua->lksb.sb_lvbptr) {
3615 			kfree(ua);
3616 			__put_lkb(ls, lkb);
3617 			error = -ENOMEM;
3618 			goto out;
3619 		}
3620 	}
3621 
3622 	/* After ua is attached to lkb it will be freed by free_lkb().
3623 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
3624 	   lock and that lkb_astparam is the dlm_user_args structure. */
3625 
3626 	error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
3627 			      DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
3628 	lkb->lkb_flags |= DLM_IFL_USER;
3629 	ua->old_mode = DLM_LOCK_IV;
3630 
3631 	if (error) {
3632 		__put_lkb(ls, lkb);
3633 		goto out;
3634 	}
3635 
3636 	error = request_lock(ls, lkb, name, namelen, &args);
3637 
3638 	switch (error) {
3639 	case 0:
3640 		break;
3641 	case -EINPROGRESS:
3642 		error = 0;
3643 		break;
3644 	case -EAGAIN:
3645 		error = 0;
3646 		/* fall through */
3647 	default:
3648 		__put_lkb(ls, lkb);
3649 		goto out;
3650 	}
3651 
3652 	/* add this new lkb to the per-process list of locks */
3653 	spin_lock(&ua->proc->locks_spin);
3654 	kref_get(&lkb->lkb_ref);
3655 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
3656 	spin_unlock(&ua->proc->locks_spin);
3657  out:
3658 	unlock_recovery(ls);
3659 	return error;
3660 }
3661 
3662 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3663 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
3664 {
3665 	struct dlm_lkb *lkb;
3666 	struct dlm_args args;
3667 	struct dlm_user_args *ua;
3668 	int error;
3669 
3670 	lock_recovery(ls);
3671 
3672 	error = find_lkb(ls, lkid, &lkb);
3673 	if (error)
3674 		goto out;
3675 
3676 	/* user can change the params on its lock when it converts it, or
3677 	   add an lvb that didn't exist before */
3678 
3679 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3680 
3681 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
3682 		ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
3683 		if (!ua->lksb.sb_lvbptr) {
3684 			error = -ENOMEM;
3685 			goto out_put;
3686 		}
3687 	}
3688 	if (lvb_in && ua->lksb.sb_lvbptr)
3689 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3690 
3691 	ua->castparam = ua_tmp->castparam;
3692 	ua->castaddr = ua_tmp->castaddr;
3693 	ua->bastparam = ua_tmp->bastparam;
3694 	ua->bastaddr = ua_tmp->bastaddr;
3695 	ua->user_lksb = ua_tmp->user_lksb;
3696 	ua->old_mode = lkb->lkb_grmode;
3697 
3698 	error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
3699 			      ua, DLM_FAKE_USER_AST, &args);
3700 	if (error)
3701 		goto out_put;
3702 
3703 	error = convert_lock(ls, lkb, &args);
3704 
3705 	if (error == -EINPROGRESS || error == -EAGAIN)
3706 		error = 0;
3707  out_put:
3708 	dlm_put_lkb(lkb);
3709  out:
3710 	unlock_recovery(ls);
3711 	kfree(ua_tmp);
3712 	return error;
3713 }
3714 
3715 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3716 		    uint32_t flags, uint32_t lkid, char *lvb_in)
3717 {
3718 	struct dlm_lkb *lkb;
3719 	struct dlm_args args;
3720 	struct dlm_user_args *ua;
3721 	int error;
3722 
3723 	lock_recovery(ls);
3724 
3725 	error = find_lkb(ls, lkid, &lkb);
3726 	if (error)
3727 		goto out;
3728 
3729 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3730 
3731 	if (lvb_in && ua->lksb.sb_lvbptr)
3732 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
3733 	ua->castparam = ua_tmp->castparam;
3734 	ua->user_lksb = ua_tmp->user_lksb;
3735 
3736 	error = set_unlock_args(flags, ua, &args);
3737 	if (error)
3738 		goto out_put;
3739 
3740 	error = unlock_lock(ls, lkb, &args);
3741 
3742 	if (error == -DLM_EUNLOCK)
3743 		error = 0;
3744 	if (error)
3745 		goto out_put;
3746 
3747 	spin_lock(&ua->proc->locks_spin);
3748 	list_del_init(&lkb->lkb_ownqueue);
3749 	spin_unlock(&ua->proc->locks_spin);
3750 
3751 	/* this removes the reference for the proc->locks list added by
3752 	   dlm_user_request */
3753 	unhold_lkb(lkb);
3754  out_put:
3755 	dlm_put_lkb(lkb);
3756  out:
3757 	unlock_recovery(ls);
3758 	return error;
3759 }
3760 
3761 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
3762 		    uint32_t flags, uint32_t lkid)
3763 {
3764 	struct dlm_lkb *lkb;
3765 	struct dlm_args args;
3766 	struct dlm_user_args *ua;
3767 	int error;
3768 
3769 	lock_recovery(ls);
3770 
3771 	error = find_lkb(ls, lkid, &lkb);
3772 	if (error)
3773 		goto out;
3774 
3775 	ua = (struct dlm_user_args *)lkb->lkb_astparam;
3776 	ua->castparam = ua_tmp->castparam;
3777 	ua->user_lksb = ua_tmp->user_lksb;
3778 
3779 	error = set_unlock_args(flags, ua, &args);
3780 	if (error)
3781 		goto out_put;
3782 
3783 	error = cancel_lock(ls, lkb, &args);
3784 
3785 	if (error == -DLM_ECANCEL)
3786 		error = 0;
3787 	if (error)
3788 		goto out_put;
3789 
3790 	/* this lkb was removed from the WAITING queue */
3791 	if (lkb->lkb_grmode == DLM_LOCK_IV) {
3792 		spin_lock(&ua->proc->locks_spin);
3793 		list_del_init(&lkb->lkb_ownqueue);
3794 		spin_unlock(&ua->proc->locks_spin);
3795 		unhold_lkb(lkb);
3796 	}
3797  out_put:
3798 	dlm_put_lkb(lkb);
3799  out:
3800 	unlock_recovery(ls);
3801 	return error;
3802 }
3803 
3804 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3805 {
3806 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3807 
3808 	if (ua->lksb.sb_lvbptr)
3809 		kfree(ua->lksb.sb_lvbptr);
3810 	kfree(ua);
3811 	lkb->lkb_astparam = (long)NULL;
3812 
3813 	/* TODO: propogate to master if needed */
3814 	return 0;
3815 }
3816 
3817 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
3818    Regardless of what rsb queue the lock is on, it's removed and freed. */
3819 
3820 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
3821 {
3822 	struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
3823 	struct dlm_args args;
3824 	int error;
3825 
3826 	/* FIXME: we need to handle the case where the lkb is in limbo
3827 	   while the rsb is being looked up, currently we assert in
3828 	   _unlock_lock/is_remote because rsb nodeid is -1. */
3829 
3830 	set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
3831 
3832 	error = unlock_lock(ls, lkb, &args);
3833 	if (error == -DLM_EUNLOCK)
3834 		error = 0;
3835 	return error;
3836 }
3837 
3838 /* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
3839    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
3840    which we clear here. */
3841 
3842 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
3843    list, and no more device_writes should add lkb's to proc->locks list; so we
3844    shouldn't need to take asts_spin or locks_spin here.  this assumes that
3845    device reads/writes/closes are serialized -- FIXME: we may need to serialize
3846    them ourself. */
3847 
3848 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
3849 {
3850 	struct dlm_lkb *lkb, *safe;
3851 
3852 	lock_recovery(ls);
3853 	mutex_lock(&ls->ls_clear_proc_locks);
3854 
3855 	list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
3856 		if (lkb->lkb_ast_type) {
3857 			list_del(&lkb->lkb_astqueue);
3858 			unhold_lkb(lkb);
3859 		}
3860 
3861 		list_del_init(&lkb->lkb_ownqueue);
3862 
3863 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
3864 			lkb->lkb_flags |= DLM_IFL_ORPHAN;
3865 			orphan_proc_lock(ls, lkb);
3866 		} else {
3867 			lkb->lkb_flags |= DLM_IFL_DEAD;
3868 			unlock_proc_lock(ls, lkb);
3869 		}
3870 
3871 		/* this removes the reference for the proc->locks list
3872 		   added by dlm_user_request, it may result in the lkb
3873 		   being freed */
3874 
3875 		dlm_put_lkb(lkb);
3876 	}
3877 	mutex_unlock(&ls->ls_clear_proc_locks);
3878 	unlock_recovery(ls);
3879 }
3880