xref: /linux/fs/dlm/lock.c (revision 60e13231561b3a4c5269bfa1ef6c0569ad6f28ec)
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) 2005-2010 Red Hat, Inc.  All rights reserved.
5 **
6 **  This copyrighted material is made available to anyone wishing to use,
7 **  modify, copy, or redistribute it subject to the terms and conditions
8 **  of the GNU General Public License v.2.
9 **
10 *******************************************************************************
11 ******************************************************************************/
12 
13 /* Central locking logic has four stages:
14 
15    dlm_lock()
16    dlm_unlock()
17 
18    request_lock(ls, lkb)
19    convert_lock(ls, lkb)
20    unlock_lock(ls, lkb)
21    cancel_lock(ls, lkb)
22 
23    _request_lock(r, lkb)
24    _convert_lock(r, lkb)
25    _unlock_lock(r, lkb)
26    _cancel_lock(r, lkb)
27 
28    do_request(r, lkb)
29    do_convert(r, lkb)
30    do_unlock(r, lkb)
31    do_cancel(r, lkb)
32 
33    Stage 1 (lock, unlock) is mainly about checking input args and
34    splitting into one of the four main operations:
35 
36        dlm_lock          = request_lock
37        dlm_lock+CONVERT  = convert_lock
38        dlm_unlock        = unlock_lock
39        dlm_unlock+CANCEL = cancel_lock
40 
41    Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42    provided to the next stage.
43 
44    Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45    When remote, it calls send_xxxx(), when local it calls do_xxxx().
46 
47    Stage 4, do_xxxx(), is the guts of the operation.  It manipulates the
48    given rsb and lkb and queues callbacks.
49 
50    For remote operations, send_xxxx() results in the corresponding do_xxxx()
51    function being executed on the remote node.  The connecting send/receive
52    calls on local (L) and remote (R) nodes:
53 
54    L: send_xxxx()              ->  R: receive_xxxx()
55                                    R: do_xxxx()
56    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
57 */
58 #include <linux/types.h>
59 #include <linux/slab.h>
60 #include "dlm_internal.h"
61 #include <linux/dlm_device.h>
62 #include "memory.h"
63 #include "lowcomms.h"
64 #include "requestqueue.h"
65 #include "util.h"
66 #include "dir.h"
67 #include "member.h"
68 #include "lockspace.h"
69 #include "ast.h"
70 #include "lock.h"
71 #include "rcom.h"
72 #include "recover.h"
73 #include "lvb_table.h"
74 #include "user.h"
75 #include "config.h"
76 
77 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
78 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
79 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
80 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
81 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
82 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
83 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
84 static int send_remove(struct dlm_rsb *r);
85 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
86 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
87 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
88 				    struct dlm_message *ms);
89 static int receive_extralen(struct dlm_message *ms);
90 static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
91 static void del_timeout(struct dlm_lkb *lkb);
92 
93 /*
94  * Lock compatibilty matrix - thanks Steve
95  * UN = Unlocked state. Not really a state, used as a flag
96  * PD = Padding. Used to make the matrix a nice power of two in size
97  * Other states are the same as the VMS DLM.
98  * Usage: matrix[grmode+1][rqmode+1]  (although m[rq+1][gr+1] is the same)
99  */
100 
101 static const int __dlm_compat_matrix[8][8] = {
102       /* UN NL CR CW PR PW EX PD */
103         {1, 1, 1, 1, 1, 1, 1, 0},       /* UN */
104         {1, 1, 1, 1, 1, 1, 1, 0},       /* NL */
105         {1, 1, 1, 1, 1, 1, 0, 0},       /* CR */
106         {1, 1, 1, 1, 0, 0, 0, 0},       /* CW */
107         {1, 1, 1, 0, 1, 0, 0, 0},       /* PR */
108         {1, 1, 1, 0, 0, 0, 0, 0},       /* PW */
109         {1, 1, 0, 0, 0, 0, 0, 0},       /* EX */
110         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
111 };
112 
113 /*
114  * This defines the direction of transfer of LVB data.
115  * Granted mode is the row; requested mode is the column.
116  * Usage: matrix[grmode+1][rqmode+1]
117  * 1 = LVB is returned to the caller
118  * 0 = LVB is written to the resource
119  * -1 = nothing happens to the LVB
120  */
121 
122 const int dlm_lvb_operations[8][8] = {
123         /* UN   NL  CR  CW  PR  PW  EX  PD*/
124         {  -1,  1,  1,  1,  1,  1,  1, -1 }, /* UN */
125         {  -1,  1,  1,  1,  1,  1,  1,  0 }, /* NL */
126         {  -1, -1,  1,  1,  1,  1,  1,  0 }, /* CR */
127         {  -1, -1, -1,  1,  1,  1,  1,  0 }, /* CW */
128         {  -1, -1, -1, -1,  1,  1,  1,  0 }, /* PR */
129         {  -1,  0,  0,  0,  0,  0,  1,  0 }, /* PW */
130         {  -1,  0,  0,  0,  0,  0,  0,  0 }, /* EX */
131         {  -1,  0,  0,  0,  0,  0,  0,  0 }  /* PD */
132 };
133 
134 #define modes_compat(gr, rq) \
135 	__dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136 
137 int dlm_modes_compat(int mode1, int mode2)
138 {
139 	return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140 }
141 
142 /*
143  * Compatibility matrix for conversions with QUECVT set.
144  * Granted mode is the row; requested mode is the column.
145  * Usage: matrix[grmode+1][rqmode+1]
146  */
147 
148 static const int __quecvt_compat_matrix[8][8] = {
149       /* UN NL CR CW PR PW EX PD */
150         {0, 0, 0, 0, 0, 0, 0, 0},       /* UN */
151         {0, 0, 1, 1, 1, 1, 1, 0},       /* NL */
152         {0, 0, 0, 1, 1, 1, 1, 0},       /* CR */
153         {0, 0, 0, 0, 1, 1, 1, 0},       /* CW */
154         {0, 0, 0, 1, 0, 1, 1, 0},       /* PR */
155         {0, 0, 0, 0, 0, 0, 1, 0},       /* PW */
156         {0, 0, 0, 0, 0, 0, 0, 0},       /* EX */
157         {0, 0, 0, 0, 0, 0, 0, 0}        /* PD */
158 };
159 
160 void dlm_print_lkb(struct dlm_lkb *lkb)
161 {
162 	printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 	       "     status %d rqmode %d grmode %d wait_type %d\n",
164 	       lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 	       lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 	       lkb->lkb_grmode, lkb->lkb_wait_type);
167 }
168 
169 static void dlm_print_rsb(struct dlm_rsb *r)
170 {
171 	printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 	       r->res_nodeid, r->res_flags, r->res_first_lkid,
173 	       r->res_recover_locks_count, r->res_name);
174 }
175 
176 void dlm_dump_rsb(struct dlm_rsb *r)
177 {
178 	struct dlm_lkb *lkb;
179 
180 	dlm_print_rsb(r);
181 
182 	printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 	       list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 	printk(KERN_ERR "rsb lookup list\n");
185 	list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 		dlm_print_lkb(lkb);
187 	printk(KERN_ERR "rsb grant queue:\n");
188 	list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 		dlm_print_lkb(lkb);
190 	printk(KERN_ERR "rsb convert queue:\n");
191 	list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 		dlm_print_lkb(lkb);
193 	printk(KERN_ERR "rsb wait queue:\n");
194 	list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 		dlm_print_lkb(lkb);
196 }
197 
198 /* Threads cannot use the lockspace while it's being recovered */
199 
200 static inline void dlm_lock_recovery(struct dlm_ls *ls)
201 {
202 	down_read(&ls->ls_in_recovery);
203 }
204 
205 void dlm_unlock_recovery(struct dlm_ls *ls)
206 {
207 	up_read(&ls->ls_in_recovery);
208 }
209 
210 int dlm_lock_recovery_try(struct dlm_ls *ls)
211 {
212 	return down_read_trylock(&ls->ls_in_recovery);
213 }
214 
215 static inline int can_be_queued(struct dlm_lkb *lkb)
216 {
217 	return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218 }
219 
220 static inline int force_blocking_asts(struct dlm_lkb *lkb)
221 {
222 	return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223 }
224 
225 static inline int is_demoted(struct dlm_lkb *lkb)
226 {
227 	return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228 }
229 
230 static inline int is_altmode(struct dlm_lkb *lkb)
231 {
232 	return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233 }
234 
235 static inline int is_granted(struct dlm_lkb *lkb)
236 {
237 	return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238 }
239 
240 static inline int is_remote(struct dlm_rsb *r)
241 {
242 	DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 	return !!r->res_nodeid;
244 }
245 
246 static inline int is_process_copy(struct dlm_lkb *lkb)
247 {
248 	return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249 }
250 
251 static inline int is_master_copy(struct dlm_lkb *lkb)
252 {
253 	if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 		DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
255 	return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
256 }
257 
258 static inline int middle_conversion(struct dlm_lkb *lkb)
259 {
260 	if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 	    (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
262 		return 1;
263 	return 0;
264 }
265 
266 static inline int down_conversion(struct dlm_lkb *lkb)
267 {
268 	return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269 }
270 
271 static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272 {
273 	return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274 }
275 
276 static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277 {
278 	return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279 }
280 
281 static inline int is_overlap(struct dlm_lkb *lkb)
282 {
283 	return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 				  DLM_IFL_OVERLAP_CANCEL));
285 }
286 
287 static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288 {
289 	if (is_master_copy(lkb))
290 		return;
291 
292 	del_timeout(lkb);
293 
294 	DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295 
296 	/* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 	   timeout caused the cancel then return -ETIMEDOUT */
298 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 		lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 		rv = -ETIMEDOUT;
301 	}
302 
303 	if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_DEADLOCK_CANCEL)) {
304 		lkb->lkb_flags &= ~DLM_IFL_DEADLOCK_CANCEL;
305 		rv = -EDEADLK;
306 	}
307 
308 	dlm_add_cb(lkb, DLM_CB_CAST, lkb->lkb_grmode, rv, lkb->lkb_sbflags);
309 }
310 
311 static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
312 {
313 	queue_cast(r, lkb,
314 		   is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
315 }
316 
317 static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
318 {
319 	if (is_master_copy(lkb)) {
320 		send_bast(r, lkb, rqmode);
321 	} else {
322 		dlm_add_cb(lkb, DLM_CB_BAST, rqmode, 0, 0);
323 	}
324 }
325 
326 /*
327  * Basic operations on rsb's and lkb's
328  */
329 
330 static int pre_rsb_struct(struct dlm_ls *ls)
331 {
332 	struct dlm_rsb *r1, *r2;
333 	int count = 0;
334 
335 	spin_lock(&ls->ls_new_rsb_spin);
336 	if (ls->ls_new_rsb_count > dlm_config.ci_new_rsb_count / 2) {
337 		spin_unlock(&ls->ls_new_rsb_spin);
338 		return 0;
339 	}
340 	spin_unlock(&ls->ls_new_rsb_spin);
341 
342 	r1 = dlm_allocate_rsb(ls);
343 	r2 = dlm_allocate_rsb(ls);
344 
345 	spin_lock(&ls->ls_new_rsb_spin);
346 	if (r1) {
347 		list_add(&r1->res_hashchain, &ls->ls_new_rsb);
348 		ls->ls_new_rsb_count++;
349 	}
350 	if (r2) {
351 		list_add(&r2->res_hashchain, &ls->ls_new_rsb);
352 		ls->ls_new_rsb_count++;
353 	}
354 	count = ls->ls_new_rsb_count;
355 	spin_unlock(&ls->ls_new_rsb_spin);
356 
357 	if (!count)
358 		return -ENOMEM;
359 	return 0;
360 }
361 
362 /* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can
363    unlock any spinlocks, go back and call pre_rsb_struct again.
364    Otherwise, take an rsb off the list and return it. */
365 
366 static int get_rsb_struct(struct dlm_ls *ls, char *name, int len,
367 			  struct dlm_rsb **r_ret)
368 {
369 	struct dlm_rsb *r;
370 	int count;
371 
372 	spin_lock(&ls->ls_new_rsb_spin);
373 	if (list_empty(&ls->ls_new_rsb)) {
374 		count = ls->ls_new_rsb_count;
375 		spin_unlock(&ls->ls_new_rsb_spin);
376 		log_debug(ls, "find_rsb retry %d %d %s",
377 			  count, dlm_config.ci_new_rsb_count, name);
378 		return -EAGAIN;
379 	}
380 
381 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
382 	list_del(&r->res_hashchain);
383 	ls->ls_new_rsb_count--;
384 	spin_unlock(&ls->ls_new_rsb_spin);
385 
386 	r->res_ls = ls;
387 	r->res_length = len;
388 	memcpy(r->res_name, name, len);
389 	mutex_init(&r->res_mutex);
390 
391 	INIT_LIST_HEAD(&r->res_hashchain);
392 	INIT_LIST_HEAD(&r->res_lookup);
393 	INIT_LIST_HEAD(&r->res_grantqueue);
394 	INIT_LIST_HEAD(&r->res_convertqueue);
395 	INIT_LIST_HEAD(&r->res_waitqueue);
396 	INIT_LIST_HEAD(&r->res_root_list);
397 	INIT_LIST_HEAD(&r->res_recover_list);
398 
399 	*r_ret = r;
400 	return 0;
401 }
402 
403 static int search_rsb_list(struct list_head *head, char *name, int len,
404 			   unsigned int flags, struct dlm_rsb **r_ret)
405 {
406 	struct dlm_rsb *r;
407 	int error = 0;
408 
409 	list_for_each_entry(r, head, res_hashchain) {
410 		if (len == r->res_length && !memcmp(name, r->res_name, len))
411 			goto found;
412 	}
413 	*r_ret = NULL;
414 	return -EBADR;
415 
416  found:
417 	if (r->res_nodeid && (flags & R_MASTER))
418 		error = -ENOTBLK;
419 	*r_ret = r;
420 	return error;
421 }
422 
423 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
424 		       unsigned int flags, struct dlm_rsb **r_ret)
425 {
426 	struct dlm_rsb *r;
427 	int error;
428 
429 	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
430 	if (!error) {
431 		kref_get(&r->res_ref);
432 		goto out;
433 	}
434 	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
435 	if (error)
436 		goto out;
437 
438 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
439 
440 	if (dlm_no_directory(ls))
441 		goto out;
442 
443 	if (r->res_nodeid == -1) {
444 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
445 		r->res_first_lkid = 0;
446 	} else if (r->res_nodeid > 0) {
447 		rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
448 		r->res_first_lkid = 0;
449 	} else {
450 		DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
451 		DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
452 	}
453  out:
454 	*r_ret = r;
455 	return error;
456 }
457 
458 /*
459  * Find rsb in rsbtbl and potentially create/add one
460  *
461  * Delaying the release of rsb's has a similar benefit to applications keeping
462  * NL locks on an rsb, but without the guarantee that the cached master value
463  * will still be valid when the rsb is reused.  Apps aren't always smart enough
464  * to keep NL locks on an rsb that they may lock again shortly; this can lead
465  * to excessive master lookups and removals if we don't delay the release.
466  *
467  * Searching for an rsb means looking through both the normal list and toss
468  * list.  When found on the toss list the rsb is moved to the normal list with
469  * ref count of 1; when found on normal list the ref count is incremented.
470  */
471 
472 static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
473 		    unsigned int flags, struct dlm_rsb **r_ret)
474 {
475 	struct dlm_rsb *r = NULL;
476 	uint32_t hash, bucket;
477 	int error;
478 
479 	if (namelen > DLM_RESNAME_MAXLEN) {
480 		error = -EINVAL;
481 		goto out;
482 	}
483 
484 	if (dlm_no_directory(ls))
485 		flags |= R_CREATE;
486 
487 	hash = jhash(name, namelen, 0);
488 	bucket = hash & (ls->ls_rsbtbl_size - 1);
489 
490  retry:
491 	if (flags & R_CREATE) {
492 		error = pre_rsb_struct(ls);
493 		if (error < 0)
494 			goto out;
495 	}
496 
497 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
498 
499 	error = _search_rsb(ls, name, namelen, bucket, flags, &r);
500 	if (!error)
501 		goto out_unlock;
502 
503 	if (error == -EBADR && !(flags & R_CREATE))
504 		goto out_unlock;
505 
506 	/* the rsb was found but wasn't a master copy */
507 	if (error == -ENOTBLK)
508 		goto out_unlock;
509 
510 	error = get_rsb_struct(ls, name, namelen, &r);
511 	if (error == -EAGAIN) {
512 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
513 		goto retry;
514 	}
515 	if (error)
516 		goto out_unlock;
517 
518 	r->res_hash = hash;
519 	r->res_bucket = bucket;
520 	r->res_nodeid = -1;
521 	kref_init(&r->res_ref);
522 
523 	/* With no directory, the master can be set immediately */
524 	if (dlm_no_directory(ls)) {
525 		int nodeid = dlm_dir_nodeid(r);
526 		if (nodeid == dlm_our_nodeid())
527 			nodeid = 0;
528 		r->res_nodeid = nodeid;
529 	}
530 	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
531 	error = 0;
532  out_unlock:
533 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
534  out:
535 	*r_ret = r;
536 	return error;
537 }
538 
539 /* This is only called to add a reference when the code already holds
540    a valid reference to the rsb, so there's no need for locking. */
541 
542 static inline void hold_rsb(struct dlm_rsb *r)
543 {
544 	kref_get(&r->res_ref);
545 }
546 
547 void dlm_hold_rsb(struct dlm_rsb *r)
548 {
549 	hold_rsb(r);
550 }
551 
552 static void toss_rsb(struct kref *kref)
553 {
554 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
555 	struct dlm_ls *ls = r->res_ls;
556 
557 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
558 	kref_init(&r->res_ref);
559 	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
560 	r->res_toss_time = jiffies;
561 	if (r->res_lvbptr) {
562 		dlm_free_lvb(r->res_lvbptr);
563 		r->res_lvbptr = NULL;
564 	}
565 }
566 
567 /* When all references to the rsb are gone it's transferred to
568    the tossed list for later disposal. */
569 
570 static void put_rsb(struct dlm_rsb *r)
571 {
572 	struct dlm_ls *ls = r->res_ls;
573 	uint32_t bucket = r->res_bucket;
574 
575 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
576 	kref_put(&r->res_ref, toss_rsb);
577 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
578 }
579 
580 void dlm_put_rsb(struct dlm_rsb *r)
581 {
582 	put_rsb(r);
583 }
584 
585 /* See comment for unhold_lkb */
586 
587 static void unhold_rsb(struct dlm_rsb *r)
588 {
589 	int rv;
590 	rv = kref_put(&r->res_ref, toss_rsb);
591 	DLM_ASSERT(!rv, dlm_dump_rsb(r););
592 }
593 
594 static void kill_rsb(struct kref *kref)
595 {
596 	struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
597 
598 	/* All work is done after the return from kref_put() so we
599 	   can release the write_lock before the remove and free. */
600 
601 	DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
602 	DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
603 	DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
604 	DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
605 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
606 	DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
607 }
608 
609 /* Attaching/detaching lkb's from rsb's is for rsb reference counting.
610    The rsb must exist as long as any lkb's for it do. */
611 
612 static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
613 {
614 	hold_rsb(r);
615 	lkb->lkb_resource = r;
616 }
617 
618 static void detach_lkb(struct dlm_lkb *lkb)
619 {
620 	if (lkb->lkb_resource) {
621 		put_rsb(lkb->lkb_resource);
622 		lkb->lkb_resource = NULL;
623 	}
624 }
625 
626 static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
627 {
628 	struct dlm_lkb *lkb;
629 	int rv, id;
630 
631 	lkb = dlm_allocate_lkb(ls);
632 	if (!lkb)
633 		return -ENOMEM;
634 
635 	lkb->lkb_nodeid = -1;
636 	lkb->lkb_grmode = DLM_LOCK_IV;
637 	kref_init(&lkb->lkb_ref);
638 	INIT_LIST_HEAD(&lkb->lkb_ownqueue);
639 	INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
640 	INIT_LIST_HEAD(&lkb->lkb_time_list);
641 	INIT_LIST_HEAD(&lkb->lkb_cb_list);
642 	mutex_init(&lkb->lkb_cb_mutex);
643 	INIT_WORK(&lkb->lkb_cb_work, dlm_callback_work);
644 
645  retry:
646 	rv = idr_pre_get(&ls->ls_lkbidr, GFP_NOFS);
647 	if (!rv)
648 		return -ENOMEM;
649 
650 	spin_lock(&ls->ls_lkbidr_spin);
651 	rv = idr_get_new_above(&ls->ls_lkbidr, lkb, 1, &id);
652 	if (!rv)
653 		lkb->lkb_id = id;
654 	spin_unlock(&ls->ls_lkbidr_spin);
655 
656 	if (rv == -EAGAIN)
657 		goto retry;
658 
659 	if (rv < 0) {
660 		log_error(ls, "create_lkb idr error %d", rv);
661 		return rv;
662 	}
663 
664 	*lkb_ret = lkb;
665 	return 0;
666 }
667 
668 static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
669 {
670 	struct dlm_lkb *lkb;
671 
672 	spin_lock(&ls->ls_lkbidr_spin);
673 	lkb = idr_find(&ls->ls_lkbidr, lkid);
674 	if (lkb)
675 		kref_get(&lkb->lkb_ref);
676 	spin_unlock(&ls->ls_lkbidr_spin);
677 
678 	*lkb_ret = lkb;
679 	return lkb ? 0 : -ENOENT;
680 }
681 
682 static void kill_lkb(struct kref *kref)
683 {
684 	struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
685 
686 	/* All work is done after the return from kref_put() so we
687 	   can release the write_lock before the detach_lkb */
688 
689 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
690 }
691 
692 /* __put_lkb() is used when an lkb may not have an rsb attached to
693    it so we need to provide the lockspace explicitly */
694 
695 static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
696 {
697 	uint32_t lkid = lkb->lkb_id;
698 
699 	spin_lock(&ls->ls_lkbidr_spin);
700 	if (kref_put(&lkb->lkb_ref, kill_lkb)) {
701 		idr_remove(&ls->ls_lkbidr, lkid);
702 		spin_unlock(&ls->ls_lkbidr_spin);
703 
704 		detach_lkb(lkb);
705 
706 		/* for local/process lkbs, lvbptr points to caller's lksb */
707 		if (lkb->lkb_lvbptr && is_master_copy(lkb))
708 			dlm_free_lvb(lkb->lkb_lvbptr);
709 		dlm_free_lkb(lkb);
710 		return 1;
711 	} else {
712 		spin_unlock(&ls->ls_lkbidr_spin);
713 		return 0;
714 	}
715 }
716 
717 int dlm_put_lkb(struct dlm_lkb *lkb)
718 {
719 	struct dlm_ls *ls;
720 
721 	DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
722 	DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
723 
724 	ls = lkb->lkb_resource->res_ls;
725 	return __put_lkb(ls, lkb);
726 }
727 
728 /* This is only called to add a reference when the code already holds
729    a valid reference to the lkb, so there's no need for locking. */
730 
731 static inline void hold_lkb(struct dlm_lkb *lkb)
732 {
733 	kref_get(&lkb->lkb_ref);
734 }
735 
736 /* This is called when we need to remove a reference and are certain
737    it's not the last ref.  e.g. del_lkb is always called between a
738    find_lkb/put_lkb and is always the inverse of a previous add_lkb.
739    put_lkb would work fine, but would involve unnecessary locking */
740 
741 static inline void unhold_lkb(struct dlm_lkb *lkb)
742 {
743 	int rv;
744 	rv = kref_put(&lkb->lkb_ref, kill_lkb);
745 	DLM_ASSERT(!rv, dlm_print_lkb(lkb););
746 }
747 
748 static void lkb_add_ordered(struct list_head *new, struct list_head *head,
749 			    int mode)
750 {
751 	struct dlm_lkb *lkb = NULL;
752 
753 	list_for_each_entry(lkb, head, lkb_statequeue)
754 		if (lkb->lkb_rqmode < mode)
755 			break;
756 
757 	__list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
758 }
759 
760 /* add/remove lkb to rsb's grant/convert/wait queue */
761 
762 static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
763 {
764 	kref_get(&lkb->lkb_ref);
765 
766 	DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
767 
768 	lkb->lkb_timestamp = ktime_get();
769 
770 	lkb->lkb_status = status;
771 
772 	switch (status) {
773 	case DLM_LKSTS_WAITING:
774 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
775 			list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
776 		else
777 			list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
778 		break;
779 	case DLM_LKSTS_GRANTED:
780 		/* convention says granted locks kept in order of grmode */
781 		lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
782 				lkb->lkb_grmode);
783 		break;
784 	case DLM_LKSTS_CONVERT:
785 		if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
786 			list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
787 		else
788 			list_add_tail(&lkb->lkb_statequeue,
789 				      &r->res_convertqueue);
790 		break;
791 	default:
792 		DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
793 	}
794 }
795 
796 static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
797 {
798 	lkb->lkb_status = 0;
799 	list_del(&lkb->lkb_statequeue);
800 	unhold_lkb(lkb);
801 }
802 
803 static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
804 {
805 	hold_lkb(lkb);
806 	del_lkb(r, lkb);
807 	add_lkb(r, lkb, sts);
808 	unhold_lkb(lkb);
809 }
810 
811 static int msg_reply_type(int mstype)
812 {
813 	switch (mstype) {
814 	case DLM_MSG_REQUEST:
815 		return DLM_MSG_REQUEST_REPLY;
816 	case DLM_MSG_CONVERT:
817 		return DLM_MSG_CONVERT_REPLY;
818 	case DLM_MSG_UNLOCK:
819 		return DLM_MSG_UNLOCK_REPLY;
820 	case DLM_MSG_CANCEL:
821 		return DLM_MSG_CANCEL_REPLY;
822 	case DLM_MSG_LOOKUP:
823 		return DLM_MSG_LOOKUP_REPLY;
824 	}
825 	return -1;
826 }
827 
828 static int nodeid_warned(int nodeid, int num_nodes, int *warned)
829 {
830 	int i;
831 
832 	for (i = 0; i < num_nodes; i++) {
833 		if (!warned[i]) {
834 			warned[i] = nodeid;
835 			return 0;
836 		}
837 		if (warned[i] == nodeid)
838 			return 1;
839 	}
840 	return 0;
841 }
842 
843 void dlm_scan_waiters(struct dlm_ls *ls)
844 {
845 	struct dlm_lkb *lkb;
846 	ktime_t zero = ktime_set(0, 0);
847 	s64 us;
848 	s64 debug_maxus = 0;
849 	u32 debug_scanned = 0;
850 	u32 debug_expired = 0;
851 	int num_nodes = 0;
852 	int *warned = NULL;
853 
854 	if (!dlm_config.ci_waitwarn_us)
855 		return;
856 
857 	mutex_lock(&ls->ls_waiters_mutex);
858 
859 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
860 		if (ktime_equal(lkb->lkb_wait_time, zero))
861 			continue;
862 
863 		debug_scanned++;
864 
865 		us = ktime_to_us(ktime_sub(ktime_get(), lkb->lkb_wait_time));
866 
867 		if (us < dlm_config.ci_waitwarn_us)
868 			continue;
869 
870 		lkb->lkb_wait_time = zero;
871 
872 		debug_expired++;
873 		if (us > debug_maxus)
874 			debug_maxus = us;
875 
876 		if (!num_nodes) {
877 			num_nodes = ls->ls_num_nodes;
878 			warned = kzalloc(num_nodes * sizeof(int), GFP_KERNEL);
879 		}
880 		if (!warned)
881 			continue;
882 		if (nodeid_warned(lkb->lkb_wait_nodeid, num_nodes, warned))
883 			continue;
884 
885 		log_error(ls, "waitwarn %x %lld %d us check connection to "
886 			  "node %d", lkb->lkb_id, (long long)us,
887 			  dlm_config.ci_waitwarn_us, lkb->lkb_wait_nodeid);
888 	}
889 	mutex_unlock(&ls->ls_waiters_mutex);
890 	kfree(warned);
891 
892 	if (debug_expired)
893 		log_debug(ls, "scan_waiters %u warn %u over %d us max %lld us",
894 			  debug_scanned, debug_expired,
895 			  dlm_config.ci_waitwarn_us, (long long)debug_maxus);
896 }
897 
898 /* add/remove lkb from global waiters list of lkb's waiting for
899    a reply from a remote node */
900 
901 static int add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
902 {
903 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
904 	int error = 0;
905 
906 	mutex_lock(&ls->ls_waiters_mutex);
907 
908 	if (is_overlap_unlock(lkb) ||
909 	    (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
910 		error = -EINVAL;
911 		goto out;
912 	}
913 
914 	if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
915 		switch (mstype) {
916 		case DLM_MSG_UNLOCK:
917 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
918 			break;
919 		case DLM_MSG_CANCEL:
920 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
921 			break;
922 		default:
923 			error = -EBUSY;
924 			goto out;
925 		}
926 		lkb->lkb_wait_count++;
927 		hold_lkb(lkb);
928 
929 		log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
930 			  lkb->lkb_id, lkb->lkb_wait_type, mstype,
931 			  lkb->lkb_wait_count, lkb->lkb_flags);
932 		goto out;
933 	}
934 
935 	DLM_ASSERT(!lkb->lkb_wait_count,
936 		   dlm_print_lkb(lkb);
937 		   printk("wait_count %d\n", lkb->lkb_wait_count););
938 
939 	lkb->lkb_wait_count++;
940 	lkb->lkb_wait_type = mstype;
941 	lkb->lkb_wait_time = ktime_get();
942 	lkb->lkb_wait_nodeid = to_nodeid; /* for debugging */
943 	hold_lkb(lkb);
944 	list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
945  out:
946 	if (error)
947 		log_error(ls, "addwait error %x %d flags %x %d %d %s",
948 			  lkb->lkb_id, error, lkb->lkb_flags, mstype,
949 			  lkb->lkb_wait_type, lkb->lkb_resource->res_name);
950 	mutex_unlock(&ls->ls_waiters_mutex);
951 	return error;
952 }
953 
954 /* We clear the RESEND flag because we might be taking an lkb off the waiters
955    list as part of process_requestqueue (e.g. a lookup that has an optimized
956    request reply on the requestqueue) between dlm_recover_waiters_pre() which
957    set RESEND and dlm_recover_waiters_post() */
958 
959 static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype,
960 				struct dlm_message *ms)
961 {
962 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
963 	int overlap_done = 0;
964 
965 	if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
966 		log_debug(ls, "remwait %x unlock_reply overlap", lkb->lkb_id);
967 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
968 		overlap_done = 1;
969 		goto out_del;
970 	}
971 
972 	if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
973 		log_debug(ls, "remwait %x cancel_reply overlap", lkb->lkb_id);
974 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
975 		overlap_done = 1;
976 		goto out_del;
977 	}
978 
979 	/* Cancel state was preemptively cleared by a successful convert,
980 	   see next comment, nothing to do. */
981 
982 	if ((mstype == DLM_MSG_CANCEL_REPLY) &&
983 	    (lkb->lkb_wait_type != DLM_MSG_CANCEL)) {
984 		log_debug(ls, "remwait %x cancel_reply wait_type %d",
985 			  lkb->lkb_id, lkb->lkb_wait_type);
986 		return -1;
987 	}
988 
989 	/* Remove for the convert reply, and premptively remove for the
990 	   cancel reply.  A convert has been granted while there's still
991 	   an outstanding cancel on it (the cancel is moot and the result
992 	   in the cancel reply should be 0).  We preempt the cancel reply
993 	   because the app gets the convert result and then can follow up
994 	   with another op, like convert.  This subsequent op would see the
995 	   lingering state of the cancel and fail with -EBUSY. */
996 
997 	if ((mstype == DLM_MSG_CONVERT_REPLY) &&
998 	    (lkb->lkb_wait_type == DLM_MSG_CONVERT) &&
999 	    is_overlap_cancel(lkb) && ms && !ms->m_result) {
1000 		log_debug(ls, "remwait %x convert_reply zap overlap_cancel",
1001 			  lkb->lkb_id);
1002 		lkb->lkb_wait_type = 0;
1003 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
1004 		lkb->lkb_wait_count--;
1005 		goto out_del;
1006 	}
1007 
1008 	/* N.B. type of reply may not always correspond to type of original
1009 	   msg due to lookup->request optimization, verify others? */
1010 
1011 	if (lkb->lkb_wait_type) {
1012 		lkb->lkb_wait_type = 0;
1013 		goto out_del;
1014 	}
1015 
1016 	log_error(ls, "remwait error %x reply %d flags %x no wait_type",
1017 		  lkb->lkb_id, mstype, lkb->lkb_flags);
1018 	return -1;
1019 
1020  out_del:
1021 	/* the force-unlock/cancel has completed and we haven't recvd a reply
1022 	   to the op that was in progress prior to the unlock/cancel; we
1023 	   give up on any reply to the earlier op.  FIXME: not sure when/how
1024 	   this would happen */
1025 
1026 	if (overlap_done && lkb->lkb_wait_type) {
1027 		log_error(ls, "remwait error %x reply %d wait_type %d overlap",
1028 			  lkb->lkb_id, mstype, lkb->lkb_wait_type);
1029 		lkb->lkb_wait_count--;
1030 		lkb->lkb_wait_type = 0;
1031 	}
1032 
1033 	DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
1034 
1035 	lkb->lkb_flags &= ~DLM_IFL_RESEND;
1036 	lkb->lkb_wait_count--;
1037 	if (!lkb->lkb_wait_count)
1038 		list_del_init(&lkb->lkb_wait_reply);
1039 	unhold_lkb(lkb);
1040 	return 0;
1041 }
1042 
1043 static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
1044 {
1045 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1046 	int error;
1047 
1048 	mutex_lock(&ls->ls_waiters_mutex);
1049 	error = _remove_from_waiters(lkb, mstype, NULL);
1050 	mutex_unlock(&ls->ls_waiters_mutex);
1051 	return error;
1052 }
1053 
1054 /* Handles situations where we might be processing a "fake" or "stub" reply in
1055    which we can't try to take waiters_mutex again. */
1056 
1057 static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
1058 {
1059 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1060 	int error;
1061 
1062 	if (ms->m_flags != DLM_IFL_STUB_MS)
1063 		mutex_lock(&ls->ls_waiters_mutex);
1064 	error = _remove_from_waiters(lkb, ms->m_type, ms);
1065 	if (ms->m_flags != DLM_IFL_STUB_MS)
1066 		mutex_unlock(&ls->ls_waiters_mutex);
1067 	return error;
1068 }
1069 
1070 static void dir_remove(struct dlm_rsb *r)
1071 {
1072 	int to_nodeid;
1073 
1074 	if (dlm_no_directory(r->res_ls))
1075 		return;
1076 
1077 	to_nodeid = dlm_dir_nodeid(r);
1078 	if (to_nodeid != dlm_our_nodeid())
1079 		send_remove(r);
1080 	else
1081 		dlm_dir_remove_entry(r->res_ls, to_nodeid,
1082 				     r->res_name, r->res_length);
1083 }
1084 
1085 /* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
1086    found since they are in order of newest to oldest? */
1087 
1088 static int shrink_bucket(struct dlm_ls *ls, int b)
1089 {
1090 	struct dlm_rsb *r;
1091 	int count = 0, found;
1092 
1093 	for (;;) {
1094 		found = 0;
1095 		spin_lock(&ls->ls_rsbtbl[b].lock);
1096 		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
1097 					    res_hashchain) {
1098 			if (!time_after_eq(jiffies, r->res_toss_time +
1099 					   dlm_config.ci_toss_secs * HZ))
1100 				continue;
1101 			found = 1;
1102 			break;
1103 		}
1104 
1105 		if (!found) {
1106 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1107 			break;
1108 		}
1109 
1110 		if (kref_put(&r->res_ref, kill_rsb)) {
1111 			list_del(&r->res_hashchain);
1112 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1113 
1114 			if (is_master(r))
1115 				dir_remove(r);
1116 			dlm_free_rsb(r);
1117 			count++;
1118 		} else {
1119 			spin_unlock(&ls->ls_rsbtbl[b].lock);
1120 			log_error(ls, "tossed rsb in use %s", r->res_name);
1121 		}
1122 	}
1123 
1124 	return count;
1125 }
1126 
1127 void dlm_scan_rsbs(struct dlm_ls *ls)
1128 {
1129 	int i;
1130 
1131 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1132 		shrink_bucket(ls, i);
1133 		if (dlm_locking_stopped(ls))
1134 			break;
1135 		cond_resched();
1136 	}
1137 }
1138 
1139 static void add_timeout(struct dlm_lkb *lkb)
1140 {
1141 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1142 
1143 	if (is_master_copy(lkb))
1144 		return;
1145 
1146 	if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1147 	    !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1148 		lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1149 		goto add_it;
1150 	}
1151 	if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1152 		goto add_it;
1153 	return;
1154 
1155  add_it:
1156 	DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1157 	mutex_lock(&ls->ls_timeout_mutex);
1158 	hold_lkb(lkb);
1159 	list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1160 	mutex_unlock(&ls->ls_timeout_mutex);
1161 }
1162 
1163 static void del_timeout(struct dlm_lkb *lkb)
1164 {
1165 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1166 
1167 	mutex_lock(&ls->ls_timeout_mutex);
1168 	if (!list_empty(&lkb->lkb_time_list)) {
1169 		list_del_init(&lkb->lkb_time_list);
1170 		unhold_lkb(lkb);
1171 	}
1172 	mutex_unlock(&ls->ls_timeout_mutex);
1173 }
1174 
1175 /* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1176    lkb_lksb_timeout without lock_rsb?  Note: we can't lock timeout_mutex
1177    and then lock rsb because of lock ordering in add_timeout.  We may need
1178    to specify some special timeout-related bits in the lkb that are just to
1179    be accessed under the timeout_mutex. */
1180 
1181 void dlm_scan_timeout(struct dlm_ls *ls)
1182 {
1183 	struct dlm_rsb *r;
1184 	struct dlm_lkb *lkb;
1185 	int do_cancel, do_warn;
1186 	s64 wait_us;
1187 
1188 	for (;;) {
1189 		if (dlm_locking_stopped(ls))
1190 			break;
1191 
1192 		do_cancel = 0;
1193 		do_warn = 0;
1194 		mutex_lock(&ls->ls_timeout_mutex);
1195 		list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1196 
1197 			wait_us = ktime_to_us(ktime_sub(ktime_get(),
1198 					      		lkb->lkb_timestamp));
1199 
1200 			if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1201 			    wait_us >= (lkb->lkb_timeout_cs * 10000))
1202 				do_cancel = 1;
1203 
1204 			if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1205 			    wait_us >= dlm_config.ci_timewarn_cs * 10000)
1206 				do_warn = 1;
1207 
1208 			if (!do_cancel && !do_warn)
1209 				continue;
1210 			hold_lkb(lkb);
1211 			break;
1212 		}
1213 		mutex_unlock(&ls->ls_timeout_mutex);
1214 
1215 		if (!do_cancel && !do_warn)
1216 			break;
1217 
1218 		r = lkb->lkb_resource;
1219 		hold_rsb(r);
1220 		lock_rsb(r);
1221 
1222 		if (do_warn) {
1223 			/* clear flag so we only warn once */
1224 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1225 			if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1226 				del_timeout(lkb);
1227 			dlm_timeout_warn(lkb);
1228 		}
1229 
1230 		if (do_cancel) {
1231 			log_debug(ls, "timeout cancel %x node %d %s",
1232 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1233 			lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1234 			lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1235 			del_timeout(lkb);
1236 			_cancel_lock(r, lkb);
1237 		}
1238 
1239 		unlock_rsb(r);
1240 		unhold_rsb(r);
1241 		dlm_put_lkb(lkb);
1242 	}
1243 }
1244 
1245 /* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1246    dlm_recoverd before checking/setting ls_recover_begin. */
1247 
1248 void dlm_adjust_timeouts(struct dlm_ls *ls)
1249 {
1250 	struct dlm_lkb *lkb;
1251 	u64 adj_us = jiffies_to_usecs(jiffies - ls->ls_recover_begin);
1252 
1253 	ls->ls_recover_begin = 0;
1254 	mutex_lock(&ls->ls_timeout_mutex);
1255 	list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1256 		lkb->lkb_timestamp = ktime_add_us(lkb->lkb_timestamp, adj_us);
1257 	mutex_unlock(&ls->ls_timeout_mutex);
1258 
1259 	if (!dlm_config.ci_waitwarn_us)
1260 		return;
1261 
1262 	mutex_lock(&ls->ls_waiters_mutex);
1263 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
1264 		if (ktime_to_us(lkb->lkb_wait_time))
1265 			lkb->lkb_wait_time = ktime_get();
1266 	}
1267 	mutex_unlock(&ls->ls_waiters_mutex);
1268 }
1269 
1270 /* lkb is master or local copy */
1271 
1272 static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1273 {
1274 	int b, len = r->res_ls->ls_lvblen;
1275 
1276 	/* b=1 lvb returned to caller
1277 	   b=0 lvb written to rsb or invalidated
1278 	   b=-1 do nothing */
1279 
1280 	b =  dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1281 
1282 	if (b == 1) {
1283 		if (!lkb->lkb_lvbptr)
1284 			return;
1285 
1286 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1287 			return;
1288 
1289 		if (!r->res_lvbptr)
1290 			return;
1291 
1292 		memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1293 		lkb->lkb_lvbseq = r->res_lvbseq;
1294 
1295 	} else if (b == 0) {
1296 		if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1297 			rsb_set_flag(r, RSB_VALNOTVALID);
1298 			return;
1299 		}
1300 
1301 		if (!lkb->lkb_lvbptr)
1302 			return;
1303 
1304 		if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1305 			return;
1306 
1307 		if (!r->res_lvbptr)
1308 			r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1309 
1310 		if (!r->res_lvbptr)
1311 			return;
1312 
1313 		memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1314 		r->res_lvbseq++;
1315 		lkb->lkb_lvbseq = r->res_lvbseq;
1316 		rsb_clear_flag(r, RSB_VALNOTVALID);
1317 	}
1318 
1319 	if (rsb_flag(r, RSB_VALNOTVALID))
1320 		lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1321 }
1322 
1323 static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1324 {
1325 	if (lkb->lkb_grmode < DLM_LOCK_PW)
1326 		return;
1327 
1328 	if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1329 		rsb_set_flag(r, RSB_VALNOTVALID);
1330 		return;
1331 	}
1332 
1333 	if (!lkb->lkb_lvbptr)
1334 		return;
1335 
1336 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1337 		return;
1338 
1339 	if (!r->res_lvbptr)
1340 		r->res_lvbptr = dlm_allocate_lvb(r->res_ls);
1341 
1342 	if (!r->res_lvbptr)
1343 		return;
1344 
1345 	memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1346 	r->res_lvbseq++;
1347 	rsb_clear_flag(r, RSB_VALNOTVALID);
1348 }
1349 
1350 /* lkb is process copy (pc) */
1351 
1352 static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1353 			    struct dlm_message *ms)
1354 {
1355 	int b;
1356 
1357 	if (!lkb->lkb_lvbptr)
1358 		return;
1359 
1360 	if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1361 		return;
1362 
1363 	b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1364 	if (b == 1) {
1365 		int len = receive_extralen(ms);
1366 		if (len > DLM_RESNAME_MAXLEN)
1367 			len = DLM_RESNAME_MAXLEN;
1368 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1369 		lkb->lkb_lvbseq = ms->m_lvbseq;
1370 	}
1371 }
1372 
1373 /* Manipulate lkb's on rsb's convert/granted/waiting queues
1374    remove_lock -- used for unlock, removes lkb from granted
1375    revert_lock -- used for cancel, moves lkb from convert to granted
1376    grant_lock  -- used for request and convert, adds lkb to granted or
1377                   moves lkb from convert or waiting to granted
1378 
1379    Each of these is used for master or local copy lkb's.  There is
1380    also a _pc() variation used to make the corresponding change on
1381    a process copy (pc) lkb. */
1382 
1383 static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1384 {
1385 	del_lkb(r, lkb);
1386 	lkb->lkb_grmode = DLM_LOCK_IV;
1387 	/* this unhold undoes the original ref from create_lkb()
1388 	   so this leads to the lkb being freed */
1389 	unhold_lkb(lkb);
1390 }
1391 
1392 static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1393 {
1394 	set_lvb_unlock(r, lkb);
1395 	_remove_lock(r, lkb);
1396 }
1397 
1398 static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1399 {
1400 	_remove_lock(r, lkb);
1401 }
1402 
1403 /* returns: 0 did nothing
1404 	    1 moved lock to granted
1405 	   -1 removed lock */
1406 
1407 static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1408 {
1409 	int rv = 0;
1410 
1411 	lkb->lkb_rqmode = DLM_LOCK_IV;
1412 
1413 	switch (lkb->lkb_status) {
1414 	case DLM_LKSTS_GRANTED:
1415 		break;
1416 	case DLM_LKSTS_CONVERT:
1417 		move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1418 		rv = 1;
1419 		break;
1420 	case DLM_LKSTS_WAITING:
1421 		del_lkb(r, lkb);
1422 		lkb->lkb_grmode = DLM_LOCK_IV;
1423 		/* this unhold undoes the original ref from create_lkb()
1424 		   so this leads to the lkb being freed */
1425 		unhold_lkb(lkb);
1426 		rv = -1;
1427 		break;
1428 	default:
1429 		log_print("invalid status for revert %d", lkb->lkb_status);
1430 	}
1431 	return rv;
1432 }
1433 
1434 static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1435 {
1436 	return revert_lock(r, lkb);
1437 }
1438 
1439 static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1440 {
1441 	if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1442 		lkb->lkb_grmode = lkb->lkb_rqmode;
1443 		if (lkb->lkb_status)
1444 			move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1445 		else
1446 			add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1447 	}
1448 
1449 	lkb->lkb_rqmode = DLM_LOCK_IV;
1450 }
1451 
1452 static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1453 {
1454 	set_lvb_lock(r, lkb);
1455 	_grant_lock(r, lkb);
1456 	lkb->lkb_highbast = 0;
1457 }
1458 
1459 static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1460 			  struct dlm_message *ms)
1461 {
1462 	set_lvb_lock_pc(r, lkb, ms);
1463 	_grant_lock(r, lkb);
1464 }
1465 
1466 /* called by grant_pending_locks() which means an async grant message must
1467    be sent to the requesting node in addition to granting the lock if the
1468    lkb belongs to a remote node. */
1469 
1470 static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1471 {
1472 	grant_lock(r, lkb);
1473 	if (is_master_copy(lkb))
1474 		send_grant(r, lkb);
1475 	else
1476 		queue_cast(r, lkb, 0);
1477 }
1478 
1479 /* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1480    change the granted/requested modes.  We're munging things accordingly in
1481    the process copy.
1482    CONVDEADLK: our grmode may have been forced down to NL to resolve a
1483    conversion deadlock
1484    ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1485    compatible with other granted locks */
1486 
1487 static void munge_demoted(struct dlm_lkb *lkb)
1488 {
1489 	if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1490 		log_print("munge_demoted %x invalid modes gr %d rq %d",
1491 			  lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1492 		return;
1493 	}
1494 
1495 	lkb->lkb_grmode = DLM_LOCK_NL;
1496 }
1497 
1498 static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1499 {
1500 	if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1501 	    ms->m_type != DLM_MSG_GRANT) {
1502 		log_print("munge_altmode %x invalid reply type %d",
1503 			  lkb->lkb_id, ms->m_type);
1504 		return;
1505 	}
1506 
1507 	if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1508 		lkb->lkb_rqmode = DLM_LOCK_PR;
1509 	else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1510 		lkb->lkb_rqmode = DLM_LOCK_CW;
1511 	else {
1512 		log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1513 		dlm_print_lkb(lkb);
1514 	}
1515 }
1516 
1517 static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1518 {
1519 	struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1520 					   lkb_statequeue);
1521 	if (lkb->lkb_id == first->lkb_id)
1522 		return 1;
1523 
1524 	return 0;
1525 }
1526 
1527 /* Check if the given lkb conflicts with another lkb on the queue. */
1528 
1529 static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1530 {
1531 	struct dlm_lkb *this;
1532 
1533 	list_for_each_entry(this, head, lkb_statequeue) {
1534 		if (this == lkb)
1535 			continue;
1536 		if (!modes_compat(this, lkb))
1537 			return 1;
1538 	}
1539 	return 0;
1540 }
1541 
1542 /*
1543  * "A conversion deadlock arises with a pair of lock requests in the converting
1544  * queue for one resource.  The granted mode of each lock blocks the requested
1545  * mode of the other lock."
1546  *
1547  * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1548  * convert queue from being granted, then deadlk/demote lkb.
1549  *
1550  * Example:
1551  * Granted Queue: empty
1552  * Convert Queue: NL->EX (first lock)
1553  *                PR->EX (second lock)
1554  *
1555  * The first lock can't be granted because of the granted mode of the second
1556  * lock and the second lock can't be granted because it's not first in the
1557  * list.  We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1558  * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1559  * flag set and return DEMOTED in the lksb flags.
1560  *
1561  * Originally, this function detected conv-deadlk in a more limited scope:
1562  * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1563  * - if lkb1 was the first entry in the queue (not just earlier), and was
1564  *   blocked by the granted mode of lkb2, and there was nothing on the
1565  *   granted queue preventing lkb1 from being granted immediately, i.e.
1566  *   lkb2 was the only thing preventing lkb1 from being granted.
1567  *
1568  * That second condition meant we'd only say there was conv-deadlk if
1569  * resolving it (by demotion) would lead to the first lock on the convert
1570  * queue being granted right away.  It allowed conversion deadlocks to exist
1571  * between locks on the convert queue while they couldn't be granted anyway.
1572  *
1573  * Now, we detect and take action on conversion deadlocks immediately when
1574  * they're created, even if they may not be immediately consequential.  If
1575  * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1576  * mode that would prevent lkb1's conversion from being granted, we do a
1577  * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1578  * I think this means that the lkb_is_ahead condition below should always
1579  * be zero, i.e. there will never be conv-deadlk between two locks that are
1580  * both already on the convert queue.
1581  */
1582 
1583 static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
1584 {
1585 	struct dlm_lkb *lkb1;
1586 	int lkb_is_ahead = 0;
1587 
1588 	list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1589 		if (lkb1 == lkb2) {
1590 			lkb_is_ahead = 1;
1591 			continue;
1592 		}
1593 
1594 		if (!lkb_is_ahead) {
1595 			if (!modes_compat(lkb2, lkb1))
1596 				return 1;
1597 		} else {
1598 			if (!modes_compat(lkb2, lkb1) &&
1599 			    !modes_compat(lkb1, lkb2))
1600 				return 1;
1601 		}
1602 	}
1603 	return 0;
1604 }
1605 
1606 /*
1607  * Return 1 if the lock can be granted, 0 otherwise.
1608  * Also detect and resolve conversion deadlocks.
1609  *
1610  * lkb is the lock to be granted
1611  *
1612  * now is 1 if the function is being called in the context of the
1613  * immediate request, it is 0 if called later, after the lock has been
1614  * queued.
1615  *
1616  * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1617  */
1618 
1619 static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1620 {
1621 	int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1622 
1623 	/*
1624 	 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1625 	 * a new request for a NL mode lock being blocked.
1626 	 *
1627 	 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1628 	 * request, then it would be granted.  In essence, the use of this flag
1629 	 * tells the Lock Manager to expedite theis request by not considering
1630 	 * what may be in the CONVERTING or WAITING queues...  As of this
1631 	 * writing, the EXPEDITE flag can be used only with new requests for NL
1632 	 * mode locks.  This flag is not valid for conversion requests.
1633 	 *
1634 	 * A shortcut.  Earlier checks return an error if EXPEDITE is used in a
1635 	 * conversion or used with a non-NL requested mode.  We also know an
1636 	 * EXPEDITE request is always granted immediately, so now must always
1637 	 * be 1.  The full condition to grant an expedite request: (now &&
1638 	 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1639 	 * therefore be shortened to just checking the flag.
1640 	 */
1641 
1642 	if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
1643 		return 1;
1644 
1645 	/*
1646 	 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1647 	 * added to the remaining conditions.
1648 	 */
1649 
1650 	if (queue_conflict(&r->res_grantqueue, lkb))
1651 		goto out;
1652 
1653 	/*
1654 	 * 6-3: By default, a conversion request is immediately granted if the
1655 	 * requested mode is compatible with the modes of all other granted
1656 	 * locks
1657 	 */
1658 
1659 	if (queue_conflict(&r->res_convertqueue, lkb))
1660 		goto out;
1661 
1662 	/*
1663 	 * 6-5: But the default algorithm for deciding whether to grant or
1664 	 * queue conversion requests does not by itself guarantee that such
1665 	 * requests are serviced on a "first come first serve" basis.  This, in
1666 	 * turn, can lead to a phenomenon known as "indefinate postponement".
1667 	 *
1668 	 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1669 	 * the system service employed to request a lock conversion.  This flag
1670 	 * forces certain conversion requests to be queued, even if they are
1671 	 * compatible with the granted modes of other locks on the same
1672 	 * resource.  Thus, the use of this flag results in conversion requests
1673 	 * being ordered on a "first come first servce" basis.
1674 	 *
1675 	 * DCT: This condition is all about new conversions being able to occur
1676 	 * "in place" while the lock remains on the granted queue (assuming
1677 	 * nothing else conflicts.)  IOW if QUECVT isn't set, a conversion
1678 	 * doesn't _have_ to go onto the convert queue where it's processed in
1679 	 * order.  The "now" variable is necessary to distinguish converts
1680 	 * being received and processed for the first time now, because once a
1681 	 * convert is moved to the conversion queue the condition below applies
1682 	 * requiring fifo granting.
1683 	 */
1684 
1685 	if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
1686 		return 1;
1687 
1688 	/*
1689 	 * The NOORDER flag is set to avoid the standard vms rules on grant
1690 	 * order.
1691 	 */
1692 
1693 	if (lkb->lkb_exflags & DLM_LKF_NOORDER)
1694 		return 1;
1695 
1696 	/*
1697 	 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1698 	 * granted until all other conversion requests ahead of it are granted
1699 	 * and/or canceled.
1700 	 */
1701 
1702 	if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
1703 		return 1;
1704 
1705 	/*
1706 	 * 6-4: By default, a new request is immediately granted only if all
1707 	 * three of the following conditions are satisfied when the request is
1708 	 * issued:
1709 	 * - The queue of ungranted conversion requests for the resource is
1710 	 *   empty.
1711 	 * - The queue of ungranted new requests for the resource is empty.
1712 	 * - The mode of the new request is compatible with the most
1713 	 *   restrictive mode of all granted locks on the resource.
1714 	 */
1715 
1716 	if (now && !conv && list_empty(&r->res_convertqueue) &&
1717 	    list_empty(&r->res_waitqueue))
1718 		return 1;
1719 
1720 	/*
1721 	 * 6-4: Once a lock request is in the queue of ungranted new requests,
1722 	 * it cannot be granted until the queue of ungranted conversion
1723 	 * requests is empty, all ungranted new requests ahead of it are
1724 	 * granted and/or canceled, and it is compatible with the granted mode
1725 	 * of the most restrictive lock granted on the resource.
1726 	 */
1727 
1728 	if (!now && !conv && list_empty(&r->res_convertqueue) &&
1729 	    first_in_list(lkb, &r->res_waitqueue))
1730 		return 1;
1731  out:
1732 	return 0;
1733 }
1734 
1735 static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1736 			  int *err)
1737 {
1738 	int rv;
1739 	int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1740 	int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1741 
1742 	if (err)
1743 		*err = 0;
1744 
1745 	rv = _can_be_granted(r, lkb, now);
1746 	if (rv)
1747 		goto out;
1748 
1749 	/*
1750 	 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1751 	 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1752 	 * cancels one of the locks.
1753 	 */
1754 
1755 	if (is_convert && can_be_queued(lkb) &&
1756 	    conversion_deadlock_detect(r, lkb)) {
1757 		if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1758 			lkb->lkb_grmode = DLM_LOCK_NL;
1759 			lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1760 		} else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1761 			if (err)
1762 				*err = -EDEADLK;
1763 			else {
1764 				log_print("can_be_granted deadlock %x now %d",
1765 					  lkb->lkb_id, now);
1766 				dlm_dump_rsb(r);
1767 			}
1768 		}
1769 		goto out;
1770 	}
1771 
1772 	/*
1773 	 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1774 	 * to grant a request in a mode other than the normal rqmode.  It's a
1775 	 * simple way to provide a big optimization to applications that can
1776 	 * use them.
1777 	 */
1778 
1779 	if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
1780 		alt = DLM_LOCK_PR;
1781 	else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
1782 		alt = DLM_LOCK_CW;
1783 
1784 	if (alt) {
1785 		lkb->lkb_rqmode = alt;
1786 		rv = _can_be_granted(r, lkb, now);
1787 		if (rv)
1788 			lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1789 		else
1790 			lkb->lkb_rqmode = rqmode;
1791 	}
1792  out:
1793 	return rv;
1794 }
1795 
1796 /* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1797    for locks pending on the convert list.  Once verified (watch for these
1798    log_prints), we should be able to just call _can_be_granted() and not
1799    bother with the demote/deadlk cases here (and there's no easy way to deal
1800    with a deadlk here, we'd have to generate something like grant_lock with
1801    the deadlk error.) */
1802 
1803 /* Returns the highest requested mode of all blocked conversions; sets
1804    cw if there's a blocked conversion to DLM_LOCK_CW. */
1805 
1806 static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
1807 {
1808 	struct dlm_lkb *lkb, *s;
1809 	int hi, demoted, quit, grant_restart, demote_restart;
1810 	int deadlk;
1811 
1812 	quit = 0;
1813  restart:
1814 	grant_restart = 0;
1815 	demote_restart = 0;
1816 	hi = DLM_LOCK_IV;
1817 
1818 	list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1819 		demoted = is_demoted(lkb);
1820 		deadlk = 0;
1821 
1822 		if (can_be_granted(r, lkb, 0, &deadlk)) {
1823 			grant_lock_pending(r, lkb);
1824 			grant_restart = 1;
1825 			continue;
1826 		}
1827 
1828 		if (!demoted && is_demoted(lkb)) {
1829 			log_print("WARN: pending demoted %x node %d %s",
1830 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1831 			demote_restart = 1;
1832 			continue;
1833 		}
1834 
1835 		if (deadlk) {
1836 			log_print("WARN: pending deadlock %x node %d %s",
1837 				  lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1838 			dlm_dump_rsb(r);
1839 			continue;
1840 		}
1841 
1842 		hi = max_t(int, lkb->lkb_rqmode, hi);
1843 
1844 		if (cw && lkb->lkb_rqmode == DLM_LOCK_CW)
1845 			*cw = 1;
1846 	}
1847 
1848 	if (grant_restart)
1849 		goto restart;
1850 	if (demote_restart && !quit) {
1851 		quit = 1;
1852 		goto restart;
1853 	}
1854 
1855 	return max_t(int, high, hi);
1856 }
1857 
1858 static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw)
1859 {
1860 	struct dlm_lkb *lkb, *s;
1861 
1862 	list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
1863 		if (can_be_granted(r, lkb, 0, NULL))
1864 			grant_lock_pending(r, lkb);
1865                 else {
1866 			high = max_t(int, lkb->lkb_rqmode, high);
1867 			if (lkb->lkb_rqmode == DLM_LOCK_CW)
1868 				*cw = 1;
1869 		}
1870 	}
1871 
1872 	return high;
1873 }
1874 
1875 /* cw of 1 means there's a lock with a rqmode of DLM_LOCK_CW that's blocked
1876    on either the convert or waiting queue.
1877    high is the largest rqmode of all locks blocked on the convert or
1878    waiting queue. */
1879 
1880 static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
1881 {
1882 	if (gr->lkb_grmode == DLM_LOCK_PR && cw) {
1883 		if (gr->lkb_highbast < DLM_LOCK_EX)
1884 			return 1;
1885 		return 0;
1886 	}
1887 
1888 	if (gr->lkb_highbast < high &&
1889 	    !__dlm_compat_matrix[gr->lkb_grmode+1][high+1])
1890 		return 1;
1891 	return 0;
1892 }
1893 
1894 static void grant_pending_locks(struct dlm_rsb *r)
1895 {
1896 	struct dlm_lkb *lkb, *s;
1897 	int high = DLM_LOCK_IV;
1898 	int cw = 0;
1899 
1900 	DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
1901 
1902 	high = grant_pending_convert(r, high, &cw);
1903 	high = grant_pending_wait(r, high, &cw);
1904 
1905 	if (high == DLM_LOCK_IV)
1906 		return;
1907 
1908 	/*
1909 	 * If there are locks left on the wait/convert queue then send blocking
1910 	 * ASTs to granted locks based on the largest requested mode (high)
1911 	 * found above.
1912 	 */
1913 
1914 	list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1915 		if (lkb->lkb_bastfn && lock_requires_bast(lkb, high, cw)) {
1916 			if (cw && high == DLM_LOCK_PR &&
1917 			    lkb->lkb_grmode == DLM_LOCK_PR)
1918 				queue_bast(r, lkb, DLM_LOCK_CW);
1919 			else
1920 				queue_bast(r, lkb, high);
1921 			lkb->lkb_highbast = high;
1922 		}
1923 	}
1924 }
1925 
1926 static int modes_require_bast(struct dlm_lkb *gr, struct dlm_lkb *rq)
1927 {
1928 	if ((gr->lkb_grmode == DLM_LOCK_PR && rq->lkb_rqmode == DLM_LOCK_CW) ||
1929 	    (gr->lkb_grmode == DLM_LOCK_CW && rq->lkb_rqmode == DLM_LOCK_PR)) {
1930 		if (gr->lkb_highbast < DLM_LOCK_EX)
1931 			return 1;
1932 		return 0;
1933 	}
1934 
1935 	if (gr->lkb_highbast < rq->lkb_rqmode && !modes_compat(gr, rq))
1936 		return 1;
1937 	return 0;
1938 }
1939 
1940 static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1941 			    struct dlm_lkb *lkb)
1942 {
1943 	struct dlm_lkb *gr;
1944 
1945 	list_for_each_entry(gr, head, lkb_statequeue) {
1946 		/* skip self when sending basts to convertqueue */
1947 		if (gr == lkb)
1948 			continue;
1949 		if (gr->lkb_bastfn && modes_require_bast(gr, lkb)) {
1950 			queue_bast(r, gr, lkb->lkb_rqmode);
1951 			gr->lkb_highbast = lkb->lkb_rqmode;
1952 		}
1953 	}
1954 }
1955 
1956 static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1957 {
1958 	send_bast_queue(r, &r->res_grantqueue, lkb);
1959 }
1960 
1961 static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1962 {
1963 	send_bast_queue(r, &r->res_grantqueue, lkb);
1964 	send_bast_queue(r, &r->res_convertqueue, lkb);
1965 }
1966 
1967 /* set_master(r, lkb) -- set the master nodeid of a resource
1968 
1969    The purpose of this function is to set the nodeid field in the given
1970    lkb using the nodeid field in the given rsb.  If the rsb's nodeid is
1971    known, it can just be copied to the lkb and the function will return
1972    0.  If the rsb's nodeid is _not_ known, it needs to be looked up
1973    before it can be copied to the lkb.
1974 
1975    When the rsb nodeid is being looked up remotely, the initial lkb
1976    causing the lookup is kept on the ls_waiters list waiting for the
1977    lookup reply.  Other lkb's waiting for the same rsb lookup are kept
1978    on the rsb's res_lookup list until the master is verified.
1979 
1980    Return values:
1981    0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1982    1: the rsb master is not available and the lkb has been placed on
1983       a wait queue
1984 */
1985 
1986 static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1987 {
1988 	struct dlm_ls *ls = r->res_ls;
1989 	int i, error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1990 
1991 	if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1992 		rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1993 		r->res_first_lkid = lkb->lkb_id;
1994 		lkb->lkb_nodeid = r->res_nodeid;
1995 		return 0;
1996 	}
1997 
1998 	if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1999 		list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
2000 		return 1;
2001 	}
2002 
2003 	if (r->res_nodeid == 0) {
2004 		lkb->lkb_nodeid = 0;
2005 		return 0;
2006 	}
2007 
2008 	if (r->res_nodeid > 0) {
2009 		lkb->lkb_nodeid = r->res_nodeid;
2010 		return 0;
2011 	}
2012 
2013 	DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
2014 
2015 	dir_nodeid = dlm_dir_nodeid(r);
2016 
2017 	if (dir_nodeid != our_nodeid) {
2018 		r->res_first_lkid = lkb->lkb_id;
2019 		send_lookup(r, lkb);
2020 		return 1;
2021 	}
2022 
2023 	for (i = 0; i < 2; i++) {
2024 		/* It's possible for dlm_scand to remove an old rsb for
2025 		   this same resource from the toss list, us to create
2026 		   a new one, look up the master locally, and find it
2027 		   already exists just before dlm_scand does the
2028 		   dir_remove() on the previous rsb. */
2029 
2030 		error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
2031 				       r->res_length, &ret_nodeid);
2032 		if (!error)
2033 			break;
2034 		log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
2035 		schedule();
2036 	}
2037 	if (error && error != -EEXIST)
2038 		return error;
2039 
2040 	if (ret_nodeid == our_nodeid) {
2041 		r->res_first_lkid = 0;
2042 		r->res_nodeid = 0;
2043 		lkb->lkb_nodeid = 0;
2044 	} else {
2045 		r->res_first_lkid = lkb->lkb_id;
2046 		r->res_nodeid = ret_nodeid;
2047 		lkb->lkb_nodeid = ret_nodeid;
2048 	}
2049 	return 0;
2050 }
2051 
2052 static void process_lookup_list(struct dlm_rsb *r)
2053 {
2054 	struct dlm_lkb *lkb, *safe;
2055 
2056 	list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
2057 		list_del_init(&lkb->lkb_rsb_lookup);
2058 		_request_lock(r, lkb);
2059 		schedule();
2060 	}
2061 }
2062 
2063 /* confirm_master -- confirm (or deny) an rsb's master nodeid */
2064 
2065 static void confirm_master(struct dlm_rsb *r, int error)
2066 {
2067 	struct dlm_lkb *lkb;
2068 
2069 	if (!r->res_first_lkid)
2070 		return;
2071 
2072 	switch (error) {
2073 	case 0:
2074 	case -EINPROGRESS:
2075 		r->res_first_lkid = 0;
2076 		process_lookup_list(r);
2077 		break;
2078 
2079 	case -EAGAIN:
2080 	case -EBADR:
2081 	case -ENOTBLK:
2082 		/* the remote request failed and won't be retried (it was
2083 		   a NOQUEUE, or has been canceled/unlocked); make a waiting
2084 		   lkb the first_lkid */
2085 
2086 		r->res_first_lkid = 0;
2087 
2088 		if (!list_empty(&r->res_lookup)) {
2089 			lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
2090 					 lkb_rsb_lookup);
2091 			list_del_init(&lkb->lkb_rsb_lookup);
2092 			r->res_first_lkid = lkb->lkb_id;
2093 			_request_lock(r, lkb);
2094 		}
2095 		break;
2096 
2097 	default:
2098 		log_error(r->res_ls, "confirm_master unknown error %d", error);
2099 	}
2100 }
2101 
2102 static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
2103 			 int namelen, unsigned long timeout_cs,
2104 			 void (*ast) (void *astparam),
2105 			 void *astparam,
2106 			 void (*bast) (void *astparam, int mode),
2107 			 struct dlm_args *args)
2108 {
2109 	int rv = -EINVAL;
2110 
2111 	/* check for invalid arg usage */
2112 
2113 	if (mode < 0 || mode > DLM_LOCK_EX)
2114 		goto out;
2115 
2116 	if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
2117 		goto out;
2118 
2119 	if (flags & DLM_LKF_CANCEL)
2120 		goto out;
2121 
2122 	if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
2123 		goto out;
2124 
2125 	if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
2126 		goto out;
2127 
2128 	if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
2129 		goto out;
2130 
2131 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
2132 		goto out;
2133 
2134 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
2135 		goto out;
2136 
2137 	if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
2138 		goto out;
2139 
2140 	if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
2141 		goto out;
2142 
2143 	if (!ast || !lksb)
2144 		goto out;
2145 
2146 	if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
2147 		goto out;
2148 
2149 	if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
2150 		goto out;
2151 
2152 	/* these args will be copied to the lkb in validate_lock_args,
2153 	   it cannot be done now because when converting locks, fields in
2154 	   an active lkb cannot be modified before locking the rsb */
2155 
2156 	args->flags = flags;
2157 	args->astfn = ast;
2158 	args->astparam = astparam;
2159 	args->bastfn = bast;
2160 	args->timeout = timeout_cs;
2161 	args->mode = mode;
2162 	args->lksb = lksb;
2163 	rv = 0;
2164  out:
2165 	return rv;
2166 }
2167 
2168 static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
2169 {
2170 	if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
2171  		      DLM_LKF_FORCEUNLOCK))
2172 		return -EINVAL;
2173 
2174 	if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
2175 		return -EINVAL;
2176 
2177 	args->flags = flags;
2178 	args->astparam = astarg;
2179 	return 0;
2180 }
2181 
2182 static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2183 			      struct dlm_args *args)
2184 {
2185 	int rv = -EINVAL;
2186 
2187 	if (args->flags & DLM_LKF_CONVERT) {
2188 		if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2189 			goto out;
2190 
2191 		if (args->flags & DLM_LKF_QUECVT &&
2192 		    !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2193 			goto out;
2194 
2195 		rv = -EBUSY;
2196 		if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2197 			goto out;
2198 
2199 		if (lkb->lkb_wait_type)
2200 			goto out;
2201 
2202 		if (is_overlap(lkb))
2203 			goto out;
2204 	}
2205 
2206 	lkb->lkb_exflags = args->flags;
2207 	lkb->lkb_sbflags = 0;
2208 	lkb->lkb_astfn = args->astfn;
2209 	lkb->lkb_astparam = args->astparam;
2210 	lkb->lkb_bastfn = args->bastfn;
2211 	lkb->lkb_rqmode = args->mode;
2212 	lkb->lkb_lksb = args->lksb;
2213 	lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2214 	lkb->lkb_ownpid = (int) current->pid;
2215 	lkb->lkb_timeout_cs = args->timeout;
2216 	rv = 0;
2217  out:
2218 	if (rv)
2219 		log_debug(ls, "validate_lock_args %d %x %x %x %d %d %s",
2220 			  rv, lkb->lkb_id, lkb->lkb_flags, args->flags,
2221 			  lkb->lkb_status, lkb->lkb_wait_type,
2222 			  lkb->lkb_resource->res_name);
2223 	return rv;
2224 }
2225 
2226 /* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2227    for success */
2228 
2229 /* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2230    because there may be a lookup in progress and it's valid to do
2231    cancel/unlockf on it */
2232 
2233 static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2234 {
2235 	struct dlm_ls *ls = lkb->lkb_resource->res_ls;
2236 	int rv = -EINVAL;
2237 
2238 	if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2239 		log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2240 		dlm_print_lkb(lkb);
2241 		goto out;
2242 	}
2243 
2244 	/* an lkb may still exist even though the lock is EOL'ed due to a
2245 	   cancel, unlock or failed noqueue request; an app can't use these
2246 	   locks; return same error as if the lkid had not been found at all */
2247 
2248 	if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2249 		log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2250 		rv = -ENOENT;
2251 		goto out;
2252 	}
2253 
2254 	/* an lkb may be waiting for an rsb lookup to complete where the
2255 	   lookup was initiated by another lock */
2256 
2257 	if (!list_empty(&lkb->lkb_rsb_lookup)) {
2258 		if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2259 			log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2260 			list_del_init(&lkb->lkb_rsb_lookup);
2261 			queue_cast(lkb->lkb_resource, lkb,
2262 				   args->flags & DLM_LKF_CANCEL ?
2263 				   -DLM_ECANCEL : -DLM_EUNLOCK);
2264 			unhold_lkb(lkb); /* undoes create_lkb() */
2265 		}
2266 		/* caller changes -EBUSY to 0 for CANCEL and FORCEUNLOCK */
2267 		rv = -EBUSY;
2268 		goto out;
2269 	}
2270 
2271 	/* cancel not allowed with another cancel/unlock in progress */
2272 
2273 	if (args->flags & DLM_LKF_CANCEL) {
2274 		if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2275 			goto out;
2276 
2277 		if (is_overlap(lkb))
2278 			goto out;
2279 
2280 		/* don't let scand try to do a cancel */
2281 		del_timeout(lkb);
2282 
2283 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2284 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2285 			rv = -EBUSY;
2286 			goto out;
2287 		}
2288 
2289 		/* there's nothing to cancel */
2290 		if (lkb->lkb_status == DLM_LKSTS_GRANTED &&
2291 		    !lkb->lkb_wait_type) {
2292 			rv = -EBUSY;
2293 			goto out;
2294 		}
2295 
2296 		switch (lkb->lkb_wait_type) {
2297 		case DLM_MSG_LOOKUP:
2298 		case DLM_MSG_REQUEST:
2299 			lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2300 			rv = -EBUSY;
2301 			goto out;
2302 		case DLM_MSG_UNLOCK:
2303 		case DLM_MSG_CANCEL:
2304 			goto out;
2305 		}
2306 		/* add_to_waiters() will set OVERLAP_CANCEL */
2307 		goto out_ok;
2308 	}
2309 
2310 	/* do we need to allow a force-unlock if there's a normal unlock
2311 	   already in progress?  in what conditions could the normal unlock
2312 	   fail such that we'd want to send a force-unlock to be sure? */
2313 
2314 	if (args->flags & DLM_LKF_FORCEUNLOCK) {
2315 		if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2316 			goto out;
2317 
2318 		if (is_overlap_unlock(lkb))
2319 			goto out;
2320 
2321 		/* don't let scand try to do a cancel */
2322 		del_timeout(lkb);
2323 
2324 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
2325 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2326 			rv = -EBUSY;
2327 			goto out;
2328 		}
2329 
2330 		switch (lkb->lkb_wait_type) {
2331 		case DLM_MSG_LOOKUP:
2332 		case DLM_MSG_REQUEST:
2333 			lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2334 			rv = -EBUSY;
2335 			goto out;
2336 		case DLM_MSG_UNLOCK:
2337 			goto out;
2338 		}
2339 		/* add_to_waiters() will set OVERLAP_UNLOCK */
2340 		goto out_ok;
2341 	}
2342 
2343 	/* normal unlock not allowed if there's any op in progress */
2344 	rv = -EBUSY;
2345 	if (lkb->lkb_wait_type || lkb->lkb_wait_count)
2346 		goto out;
2347 
2348  out_ok:
2349 	/* an overlapping op shouldn't blow away exflags from other op */
2350 	lkb->lkb_exflags |= args->flags;
2351 	lkb->lkb_sbflags = 0;
2352 	lkb->lkb_astparam = args->astparam;
2353 	rv = 0;
2354  out:
2355 	if (rv)
2356 		log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2357 			  lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2358 			  args->flags, lkb->lkb_wait_type,
2359 			  lkb->lkb_resource->res_name);
2360 	return rv;
2361 }
2362 
2363 /*
2364  * Four stage 4 varieties:
2365  * do_request(), do_convert(), do_unlock(), do_cancel()
2366  * These are called on the master node for the given lock and
2367  * from the central locking logic.
2368  */
2369 
2370 static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2371 {
2372 	int error = 0;
2373 
2374 	if (can_be_granted(r, lkb, 1, NULL)) {
2375 		grant_lock(r, lkb);
2376 		queue_cast(r, lkb, 0);
2377 		goto out;
2378 	}
2379 
2380 	if (can_be_queued(lkb)) {
2381 		error = -EINPROGRESS;
2382 		add_lkb(r, lkb, DLM_LKSTS_WAITING);
2383 		add_timeout(lkb);
2384 		goto out;
2385 	}
2386 
2387 	error = -EAGAIN;
2388 	queue_cast(r, lkb, -EAGAIN);
2389  out:
2390 	return error;
2391 }
2392 
2393 static void do_request_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2394 			       int error)
2395 {
2396 	switch (error) {
2397 	case -EAGAIN:
2398 		if (force_blocking_asts(lkb))
2399 			send_blocking_asts_all(r, lkb);
2400 		break;
2401 	case -EINPROGRESS:
2402 		send_blocking_asts(r, lkb);
2403 		break;
2404 	}
2405 }
2406 
2407 static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2408 {
2409 	int error = 0;
2410 	int deadlk = 0;
2411 
2412 	/* changing an existing lock may allow others to be granted */
2413 
2414 	if (can_be_granted(r, lkb, 1, &deadlk)) {
2415 		grant_lock(r, lkb);
2416 		queue_cast(r, lkb, 0);
2417 		goto out;
2418 	}
2419 
2420 	/* can_be_granted() detected that this lock would block in a conversion
2421 	   deadlock, so we leave it on the granted queue and return EDEADLK in
2422 	   the ast for the convert. */
2423 
2424 	if (deadlk) {
2425 		/* it's left on the granted queue */
2426 		revert_lock(r, lkb);
2427 		queue_cast(r, lkb, -EDEADLK);
2428 		error = -EDEADLK;
2429 		goto out;
2430 	}
2431 
2432 	/* is_demoted() means the can_be_granted() above set the grmode
2433 	   to NL, and left us on the granted queue.  This auto-demotion
2434 	   (due to CONVDEADLK) might mean other locks, and/or this lock, are
2435 	   now grantable.  We have to try to grant other converting locks
2436 	   before we try again to grant this one. */
2437 
2438 	if (is_demoted(lkb)) {
2439 		grant_pending_convert(r, DLM_LOCK_IV, NULL);
2440 		if (_can_be_granted(r, lkb, 1)) {
2441 			grant_lock(r, lkb);
2442 			queue_cast(r, lkb, 0);
2443 			goto out;
2444 		}
2445 		/* else fall through and move to convert queue */
2446 	}
2447 
2448 	if (can_be_queued(lkb)) {
2449 		error = -EINPROGRESS;
2450 		del_lkb(r, lkb);
2451 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2452 		add_timeout(lkb);
2453 		goto out;
2454 	}
2455 
2456 	error = -EAGAIN;
2457 	queue_cast(r, lkb, -EAGAIN);
2458  out:
2459 	return error;
2460 }
2461 
2462 static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2463 			       int error)
2464 {
2465 	switch (error) {
2466 	case 0:
2467 		grant_pending_locks(r);
2468 		/* grant_pending_locks also sends basts */
2469 		break;
2470 	case -EAGAIN:
2471 		if (force_blocking_asts(lkb))
2472 			send_blocking_asts_all(r, lkb);
2473 		break;
2474 	case -EINPROGRESS:
2475 		send_blocking_asts(r, lkb);
2476 		break;
2477 	}
2478 }
2479 
2480 static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481 {
2482 	remove_lock(r, lkb);
2483 	queue_cast(r, lkb, -DLM_EUNLOCK);
2484 	return -DLM_EUNLOCK;
2485 }
2486 
2487 static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2488 			      int error)
2489 {
2490 	grant_pending_locks(r);
2491 }
2492 
2493 /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
2494 
2495 static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2496 {
2497 	int error;
2498 
2499 	error = revert_lock(r, lkb);
2500 	if (error) {
2501 		queue_cast(r, lkb, -DLM_ECANCEL);
2502 		return -DLM_ECANCEL;
2503 	}
2504 	return 0;
2505 }
2506 
2507 static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
2508 			      int error)
2509 {
2510 	if (error)
2511 		grant_pending_locks(r);
2512 }
2513 
2514 /*
2515  * Four stage 3 varieties:
2516  * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2517  */
2518 
2519 /* add a new lkb to a possibly new rsb, called by requesting process */
2520 
2521 static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2522 {
2523 	int error;
2524 
2525 	/* set_master: sets lkb nodeid from r */
2526 
2527 	error = set_master(r, lkb);
2528 	if (error < 0)
2529 		goto out;
2530 	if (error) {
2531 		error = 0;
2532 		goto out;
2533 	}
2534 
2535 	if (is_remote(r)) {
2536 		/* receive_request() calls do_request() on remote node */
2537 		error = send_request(r, lkb);
2538 	} else {
2539 		error = do_request(r, lkb);
2540 		/* for remote locks the request_reply is sent
2541 		   between do_request and do_request_effects */
2542 		do_request_effects(r, lkb, error);
2543 	}
2544  out:
2545 	return error;
2546 }
2547 
2548 /* change some property of an existing lkb, e.g. mode */
2549 
2550 static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2551 {
2552 	int error;
2553 
2554 	if (is_remote(r)) {
2555 		/* receive_convert() calls do_convert() on remote node */
2556 		error = send_convert(r, lkb);
2557 	} else {
2558 		error = do_convert(r, lkb);
2559 		/* for remote locks the convert_reply is sent
2560 		   between do_convert and do_convert_effects */
2561 		do_convert_effects(r, lkb, error);
2562 	}
2563 
2564 	return error;
2565 }
2566 
2567 /* remove an existing lkb from the granted queue */
2568 
2569 static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2570 {
2571 	int error;
2572 
2573 	if (is_remote(r)) {
2574 		/* receive_unlock() calls do_unlock() on remote node */
2575 		error = send_unlock(r, lkb);
2576 	} else {
2577 		error = do_unlock(r, lkb);
2578 		/* for remote locks the unlock_reply is sent
2579 		   between do_unlock and do_unlock_effects */
2580 		do_unlock_effects(r, lkb, error);
2581 	}
2582 
2583 	return error;
2584 }
2585 
2586 /* remove an existing lkb from the convert or wait queue */
2587 
2588 static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2589 {
2590 	int error;
2591 
2592 	if (is_remote(r)) {
2593 		/* receive_cancel() calls do_cancel() on remote node */
2594 		error = send_cancel(r, lkb);
2595 	} else {
2596 		error = do_cancel(r, lkb);
2597 		/* for remote locks the cancel_reply is sent
2598 		   between do_cancel and do_cancel_effects */
2599 		do_cancel_effects(r, lkb, error);
2600 	}
2601 
2602 	return error;
2603 }
2604 
2605 /*
2606  * Four stage 2 varieties:
2607  * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2608  */
2609 
2610 static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2611 			int len, struct dlm_args *args)
2612 {
2613 	struct dlm_rsb *r;
2614 	int error;
2615 
2616 	error = validate_lock_args(ls, lkb, args);
2617 	if (error)
2618 		goto out;
2619 
2620 	error = find_rsb(ls, name, len, R_CREATE, &r);
2621 	if (error)
2622 		goto out;
2623 
2624 	lock_rsb(r);
2625 
2626 	attach_lkb(r, lkb);
2627 	lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2628 
2629 	error = _request_lock(r, lkb);
2630 
2631 	unlock_rsb(r);
2632 	put_rsb(r);
2633 
2634  out:
2635 	return error;
2636 }
2637 
2638 static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2639 			struct dlm_args *args)
2640 {
2641 	struct dlm_rsb *r;
2642 	int error;
2643 
2644 	r = lkb->lkb_resource;
2645 
2646 	hold_rsb(r);
2647 	lock_rsb(r);
2648 
2649 	error = validate_lock_args(ls, lkb, args);
2650 	if (error)
2651 		goto out;
2652 
2653 	error = _convert_lock(r, lkb);
2654  out:
2655 	unlock_rsb(r);
2656 	put_rsb(r);
2657 	return error;
2658 }
2659 
2660 static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2661 		       struct dlm_args *args)
2662 {
2663 	struct dlm_rsb *r;
2664 	int error;
2665 
2666 	r = lkb->lkb_resource;
2667 
2668 	hold_rsb(r);
2669 	lock_rsb(r);
2670 
2671 	error = validate_unlock_args(lkb, args);
2672 	if (error)
2673 		goto out;
2674 
2675 	error = _unlock_lock(r, lkb);
2676  out:
2677 	unlock_rsb(r);
2678 	put_rsb(r);
2679 	return error;
2680 }
2681 
2682 static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2683 		       struct dlm_args *args)
2684 {
2685 	struct dlm_rsb *r;
2686 	int error;
2687 
2688 	r = lkb->lkb_resource;
2689 
2690 	hold_rsb(r);
2691 	lock_rsb(r);
2692 
2693 	error = validate_unlock_args(lkb, args);
2694 	if (error)
2695 		goto out;
2696 
2697 	error = _cancel_lock(r, lkb);
2698  out:
2699 	unlock_rsb(r);
2700 	put_rsb(r);
2701 	return error;
2702 }
2703 
2704 /*
2705  * Two stage 1 varieties:  dlm_lock() and dlm_unlock()
2706  */
2707 
2708 int dlm_lock(dlm_lockspace_t *lockspace,
2709 	     int mode,
2710 	     struct dlm_lksb *lksb,
2711 	     uint32_t flags,
2712 	     void *name,
2713 	     unsigned int namelen,
2714 	     uint32_t parent_lkid,
2715 	     void (*ast) (void *astarg),
2716 	     void *astarg,
2717 	     void (*bast) (void *astarg, int mode))
2718 {
2719 	struct dlm_ls *ls;
2720 	struct dlm_lkb *lkb;
2721 	struct dlm_args args;
2722 	int error, convert = flags & DLM_LKF_CONVERT;
2723 
2724 	ls = dlm_find_lockspace_local(lockspace);
2725 	if (!ls)
2726 		return -EINVAL;
2727 
2728 	dlm_lock_recovery(ls);
2729 
2730 	if (convert)
2731 		error = find_lkb(ls, lksb->sb_lkid, &lkb);
2732 	else
2733 		error = create_lkb(ls, &lkb);
2734 
2735 	if (error)
2736 		goto out;
2737 
2738 	error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
2739 			      astarg, bast, &args);
2740 	if (error)
2741 		goto out_put;
2742 
2743 	if (convert)
2744 		error = convert_lock(ls, lkb, &args);
2745 	else
2746 		error = request_lock(ls, lkb, name, namelen, &args);
2747 
2748 	if (error == -EINPROGRESS)
2749 		error = 0;
2750  out_put:
2751 	if (convert || error)
2752 		__put_lkb(ls, lkb);
2753 	if (error == -EAGAIN || error == -EDEADLK)
2754 		error = 0;
2755  out:
2756 	dlm_unlock_recovery(ls);
2757 	dlm_put_lockspace(ls);
2758 	return error;
2759 }
2760 
2761 int dlm_unlock(dlm_lockspace_t *lockspace,
2762 	       uint32_t lkid,
2763 	       uint32_t flags,
2764 	       struct dlm_lksb *lksb,
2765 	       void *astarg)
2766 {
2767 	struct dlm_ls *ls;
2768 	struct dlm_lkb *lkb;
2769 	struct dlm_args args;
2770 	int error;
2771 
2772 	ls = dlm_find_lockspace_local(lockspace);
2773 	if (!ls)
2774 		return -EINVAL;
2775 
2776 	dlm_lock_recovery(ls);
2777 
2778 	error = find_lkb(ls, lkid, &lkb);
2779 	if (error)
2780 		goto out;
2781 
2782 	error = set_unlock_args(flags, astarg, &args);
2783 	if (error)
2784 		goto out_put;
2785 
2786 	if (flags & DLM_LKF_CANCEL)
2787 		error = cancel_lock(ls, lkb, &args);
2788 	else
2789 		error = unlock_lock(ls, lkb, &args);
2790 
2791 	if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2792 		error = 0;
2793 	if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2794 		error = 0;
2795  out_put:
2796 	dlm_put_lkb(lkb);
2797  out:
2798 	dlm_unlock_recovery(ls);
2799 	dlm_put_lockspace(ls);
2800 	return error;
2801 }
2802 
2803 /*
2804  * send/receive routines for remote operations and replies
2805  *
2806  * send_args
2807  * send_common
2808  * send_request			receive_request
2809  * send_convert			receive_convert
2810  * send_unlock			receive_unlock
2811  * send_cancel			receive_cancel
2812  * send_grant			receive_grant
2813  * send_bast			receive_bast
2814  * send_lookup			receive_lookup
2815  * send_remove			receive_remove
2816  *
2817  * 				send_common_reply
2818  * receive_request_reply	send_request_reply
2819  * receive_convert_reply	send_convert_reply
2820  * receive_unlock_reply		send_unlock_reply
2821  * receive_cancel_reply		send_cancel_reply
2822  * receive_lookup_reply		send_lookup_reply
2823  */
2824 
2825 static int _create_message(struct dlm_ls *ls, int mb_len,
2826 			   int to_nodeid, int mstype,
2827 			   struct dlm_message **ms_ret,
2828 			   struct dlm_mhandle **mh_ret)
2829 {
2830 	struct dlm_message *ms;
2831 	struct dlm_mhandle *mh;
2832 	char *mb;
2833 
2834 	/* get_buffer gives us a message handle (mh) that we need to
2835 	   pass into lowcomms_commit and a message buffer (mb) that we
2836 	   write our data into */
2837 
2838 	mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_NOFS, &mb);
2839 	if (!mh)
2840 		return -ENOBUFS;
2841 
2842 	memset(mb, 0, mb_len);
2843 
2844 	ms = (struct dlm_message *) mb;
2845 
2846 	ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2847 	ms->m_header.h_lockspace = ls->ls_global_id;
2848 	ms->m_header.h_nodeid = dlm_our_nodeid();
2849 	ms->m_header.h_length = mb_len;
2850 	ms->m_header.h_cmd = DLM_MSG;
2851 
2852 	ms->m_type = mstype;
2853 
2854 	*mh_ret = mh;
2855 	*ms_ret = ms;
2856 	return 0;
2857 }
2858 
2859 static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2860 			  int to_nodeid, int mstype,
2861 			  struct dlm_message **ms_ret,
2862 			  struct dlm_mhandle **mh_ret)
2863 {
2864 	int mb_len = sizeof(struct dlm_message);
2865 
2866 	switch (mstype) {
2867 	case DLM_MSG_REQUEST:
2868 	case DLM_MSG_LOOKUP:
2869 	case DLM_MSG_REMOVE:
2870 		mb_len += r->res_length;
2871 		break;
2872 	case DLM_MSG_CONVERT:
2873 	case DLM_MSG_UNLOCK:
2874 	case DLM_MSG_REQUEST_REPLY:
2875 	case DLM_MSG_CONVERT_REPLY:
2876 	case DLM_MSG_GRANT:
2877 		if (lkb && lkb->lkb_lvbptr)
2878 			mb_len += r->res_ls->ls_lvblen;
2879 		break;
2880 	}
2881 
2882 	return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2883 			       ms_ret, mh_ret);
2884 }
2885 
2886 /* further lowcomms enhancements or alternate implementations may make
2887    the return value from this function useful at some point */
2888 
2889 static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2890 {
2891 	dlm_message_out(ms);
2892 	dlm_lowcomms_commit_buffer(mh);
2893 	return 0;
2894 }
2895 
2896 static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2897 		      struct dlm_message *ms)
2898 {
2899 	ms->m_nodeid   = lkb->lkb_nodeid;
2900 	ms->m_pid      = lkb->lkb_ownpid;
2901 	ms->m_lkid     = lkb->lkb_id;
2902 	ms->m_remid    = lkb->lkb_remid;
2903 	ms->m_exflags  = lkb->lkb_exflags;
2904 	ms->m_sbflags  = lkb->lkb_sbflags;
2905 	ms->m_flags    = lkb->lkb_flags;
2906 	ms->m_lvbseq   = lkb->lkb_lvbseq;
2907 	ms->m_status   = lkb->lkb_status;
2908 	ms->m_grmode   = lkb->lkb_grmode;
2909 	ms->m_rqmode   = lkb->lkb_rqmode;
2910 	ms->m_hash     = r->res_hash;
2911 
2912 	/* m_result and m_bastmode are set from function args,
2913 	   not from lkb fields */
2914 
2915 	if (lkb->lkb_bastfn)
2916 		ms->m_asts |= DLM_CB_BAST;
2917 	if (lkb->lkb_astfn)
2918 		ms->m_asts |= DLM_CB_CAST;
2919 
2920 	/* compare with switch in create_message; send_remove() doesn't
2921 	   use send_args() */
2922 
2923 	switch (ms->m_type) {
2924 	case DLM_MSG_REQUEST:
2925 	case DLM_MSG_LOOKUP:
2926 		memcpy(ms->m_extra, r->res_name, r->res_length);
2927 		break;
2928 	case DLM_MSG_CONVERT:
2929 	case DLM_MSG_UNLOCK:
2930 	case DLM_MSG_REQUEST_REPLY:
2931 	case DLM_MSG_CONVERT_REPLY:
2932 	case DLM_MSG_GRANT:
2933 		if (!lkb->lkb_lvbptr)
2934 			break;
2935 		memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
2936 		break;
2937 	}
2938 }
2939 
2940 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2941 {
2942 	struct dlm_message *ms;
2943 	struct dlm_mhandle *mh;
2944 	int to_nodeid, error;
2945 
2946 	to_nodeid = r->res_nodeid;
2947 
2948 	error = add_to_waiters(lkb, mstype, to_nodeid);
2949 	if (error)
2950 		return error;
2951 
2952 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2953 	if (error)
2954 		goto fail;
2955 
2956 	send_args(r, lkb, ms);
2957 
2958 	error = send_message(mh, ms);
2959 	if (error)
2960 		goto fail;
2961 	return 0;
2962 
2963  fail:
2964 	remove_from_waiters(lkb, msg_reply_type(mstype));
2965 	return error;
2966 }
2967 
2968 static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2969 {
2970 	return send_common(r, lkb, DLM_MSG_REQUEST);
2971 }
2972 
2973 static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2974 {
2975 	int error;
2976 
2977 	error = send_common(r, lkb, DLM_MSG_CONVERT);
2978 
2979 	/* down conversions go without a reply from the master */
2980 	if (!error && down_conversion(lkb)) {
2981 		remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2982 		r->res_ls->ls_stub_ms.m_flags = DLM_IFL_STUB_MS;
2983 		r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
2984 		r->res_ls->ls_stub_ms.m_result = 0;
2985 		__receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2986 	}
2987 
2988 	return error;
2989 }
2990 
2991 /* FIXME: if this lkb is the only lock we hold on the rsb, then set
2992    MASTER_UNCERTAIN to force the next request on the rsb to confirm
2993    that the master is still correct. */
2994 
2995 static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2996 {
2997 	return send_common(r, lkb, DLM_MSG_UNLOCK);
2998 }
2999 
3000 static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
3001 {
3002 	return send_common(r, lkb, DLM_MSG_CANCEL);
3003 }
3004 
3005 static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
3006 {
3007 	struct dlm_message *ms;
3008 	struct dlm_mhandle *mh;
3009 	int to_nodeid, error;
3010 
3011 	to_nodeid = lkb->lkb_nodeid;
3012 
3013 	error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
3014 	if (error)
3015 		goto out;
3016 
3017 	send_args(r, lkb, ms);
3018 
3019 	ms->m_result = 0;
3020 
3021 	error = send_message(mh, ms);
3022  out:
3023 	return error;
3024 }
3025 
3026 static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
3027 {
3028 	struct dlm_message *ms;
3029 	struct dlm_mhandle *mh;
3030 	int to_nodeid, error;
3031 
3032 	to_nodeid = lkb->lkb_nodeid;
3033 
3034 	error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
3035 	if (error)
3036 		goto out;
3037 
3038 	send_args(r, lkb, ms);
3039 
3040 	ms->m_bastmode = mode;
3041 
3042 	error = send_message(mh, ms);
3043  out:
3044 	return error;
3045 }
3046 
3047 static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
3048 {
3049 	struct dlm_message *ms;
3050 	struct dlm_mhandle *mh;
3051 	int to_nodeid, error;
3052 
3053 	to_nodeid = dlm_dir_nodeid(r);
3054 
3055 	error = add_to_waiters(lkb, DLM_MSG_LOOKUP, to_nodeid);
3056 	if (error)
3057 		return error;
3058 
3059 	error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
3060 	if (error)
3061 		goto fail;
3062 
3063 	send_args(r, lkb, ms);
3064 
3065 	error = send_message(mh, ms);
3066 	if (error)
3067 		goto fail;
3068 	return 0;
3069 
3070  fail:
3071 	remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3072 	return error;
3073 }
3074 
3075 static int send_remove(struct dlm_rsb *r)
3076 {
3077 	struct dlm_message *ms;
3078 	struct dlm_mhandle *mh;
3079 	int to_nodeid, error;
3080 
3081 	to_nodeid = dlm_dir_nodeid(r);
3082 
3083 	error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
3084 	if (error)
3085 		goto out;
3086 
3087 	memcpy(ms->m_extra, r->res_name, r->res_length);
3088 	ms->m_hash = r->res_hash;
3089 
3090 	error = send_message(mh, ms);
3091  out:
3092 	return error;
3093 }
3094 
3095 static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3096 			     int mstype, int rv)
3097 {
3098 	struct dlm_message *ms;
3099 	struct dlm_mhandle *mh;
3100 	int to_nodeid, error;
3101 
3102 	to_nodeid = lkb->lkb_nodeid;
3103 
3104 	error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
3105 	if (error)
3106 		goto out;
3107 
3108 	send_args(r, lkb, ms);
3109 
3110 	ms->m_result = rv;
3111 
3112 	error = send_message(mh, ms);
3113  out:
3114 	return error;
3115 }
3116 
3117 static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3118 {
3119 	return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
3120 }
3121 
3122 static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3123 {
3124 	return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
3125 }
3126 
3127 static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3128 {
3129 	return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
3130 }
3131 
3132 static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
3133 {
3134 	return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
3135 }
3136 
3137 static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
3138 			     int ret_nodeid, int rv)
3139 {
3140 	struct dlm_rsb *r = &ls->ls_stub_rsb;
3141 	struct dlm_message *ms;
3142 	struct dlm_mhandle *mh;
3143 	int error, nodeid = ms_in->m_header.h_nodeid;
3144 
3145 	error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
3146 	if (error)
3147 		goto out;
3148 
3149 	ms->m_lkid = ms_in->m_lkid;
3150 	ms->m_result = rv;
3151 	ms->m_nodeid = ret_nodeid;
3152 
3153 	error = send_message(mh, ms);
3154  out:
3155 	return error;
3156 }
3157 
3158 /* which args we save from a received message depends heavily on the type
3159    of message, unlike the send side where we can safely send everything about
3160    the lkb for any type of message */
3161 
3162 static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
3163 {
3164 	lkb->lkb_exflags = ms->m_exflags;
3165 	lkb->lkb_sbflags = ms->m_sbflags;
3166 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3167 		         (ms->m_flags & 0x0000FFFF);
3168 }
3169 
3170 static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3171 {
3172 	if (ms->m_flags == DLM_IFL_STUB_MS)
3173 		return;
3174 
3175 	lkb->lkb_sbflags = ms->m_sbflags;
3176 	lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
3177 		         (ms->m_flags & 0x0000FFFF);
3178 }
3179 
3180 static int receive_extralen(struct dlm_message *ms)
3181 {
3182 	return (ms->m_header.h_length - sizeof(struct dlm_message));
3183 }
3184 
3185 static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
3186 		       struct dlm_message *ms)
3187 {
3188 	int len;
3189 
3190 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3191 		if (!lkb->lkb_lvbptr)
3192 			lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3193 		if (!lkb->lkb_lvbptr)
3194 			return -ENOMEM;
3195 		len = receive_extralen(ms);
3196 		if (len > DLM_RESNAME_MAXLEN)
3197 			len = DLM_RESNAME_MAXLEN;
3198 		memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
3199 	}
3200 	return 0;
3201 }
3202 
3203 static void fake_bastfn(void *astparam, int mode)
3204 {
3205 	log_print("fake_bastfn should not be called");
3206 }
3207 
3208 static void fake_astfn(void *astparam)
3209 {
3210 	log_print("fake_astfn should not be called");
3211 }
3212 
3213 static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3214 				struct dlm_message *ms)
3215 {
3216 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3217 	lkb->lkb_ownpid = ms->m_pid;
3218 	lkb->lkb_remid = ms->m_lkid;
3219 	lkb->lkb_grmode = DLM_LOCK_IV;
3220 	lkb->lkb_rqmode = ms->m_rqmode;
3221 
3222 	lkb->lkb_bastfn = (ms->m_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
3223 	lkb->lkb_astfn = (ms->m_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
3224 
3225 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3226 		/* lkb was just created so there won't be an lvb yet */
3227 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
3228 		if (!lkb->lkb_lvbptr)
3229 			return -ENOMEM;
3230 	}
3231 
3232 	return 0;
3233 }
3234 
3235 static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3236 				struct dlm_message *ms)
3237 {
3238 	if (lkb->lkb_status != DLM_LKSTS_GRANTED)
3239 		return -EBUSY;
3240 
3241 	if (receive_lvb(ls, lkb, ms))
3242 		return -ENOMEM;
3243 
3244 	lkb->lkb_rqmode = ms->m_rqmode;
3245 	lkb->lkb_lvbseq = ms->m_lvbseq;
3246 
3247 	return 0;
3248 }
3249 
3250 static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3251 			       struct dlm_message *ms)
3252 {
3253 	if (receive_lvb(ls, lkb, ms))
3254 		return -ENOMEM;
3255 	return 0;
3256 }
3257 
3258 /* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3259    uses to send a reply and that the remote end uses to process the reply. */
3260 
3261 static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3262 {
3263 	struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3264 	lkb->lkb_nodeid = ms->m_header.h_nodeid;
3265 	lkb->lkb_remid = ms->m_lkid;
3266 }
3267 
3268 /* This is called after the rsb is locked so that we can safely inspect
3269    fields in the lkb. */
3270 
3271 static int validate_message(struct dlm_lkb *lkb, struct dlm_message *ms)
3272 {
3273 	int from = ms->m_header.h_nodeid;
3274 	int error = 0;
3275 
3276 	switch (ms->m_type) {
3277 	case DLM_MSG_CONVERT:
3278 	case DLM_MSG_UNLOCK:
3279 	case DLM_MSG_CANCEL:
3280 		if (!is_master_copy(lkb) || lkb->lkb_nodeid != from)
3281 			error = -EINVAL;
3282 		break;
3283 
3284 	case DLM_MSG_CONVERT_REPLY:
3285 	case DLM_MSG_UNLOCK_REPLY:
3286 	case DLM_MSG_CANCEL_REPLY:
3287 	case DLM_MSG_GRANT:
3288 	case DLM_MSG_BAST:
3289 		if (!is_process_copy(lkb) || lkb->lkb_nodeid != from)
3290 			error = -EINVAL;
3291 		break;
3292 
3293 	case DLM_MSG_REQUEST_REPLY:
3294 		if (!is_process_copy(lkb))
3295 			error = -EINVAL;
3296 		else if (lkb->lkb_nodeid != -1 && lkb->lkb_nodeid != from)
3297 			error = -EINVAL;
3298 		break;
3299 
3300 	default:
3301 		error = -EINVAL;
3302 	}
3303 
3304 	if (error)
3305 		log_error(lkb->lkb_resource->res_ls,
3306 			  "ignore invalid message %d from %d %x %x %x %d",
3307 			  ms->m_type, from, lkb->lkb_id, lkb->lkb_remid,
3308 			  lkb->lkb_flags, lkb->lkb_nodeid);
3309 	return error;
3310 }
3311 
3312 static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3313 {
3314 	struct dlm_lkb *lkb;
3315 	struct dlm_rsb *r;
3316 	int error, namelen;
3317 
3318 	error = create_lkb(ls, &lkb);
3319 	if (error)
3320 		goto fail;
3321 
3322 	receive_flags(lkb, ms);
3323 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
3324 	error = receive_request_args(ls, lkb, ms);
3325 	if (error) {
3326 		__put_lkb(ls, lkb);
3327 		goto fail;
3328 	}
3329 
3330 	namelen = receive_extralen(ms);
3331 
3332 	error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3333 	if (error) {
3334 		__put_lkb(ls, lkb);
3335 		goto fail;
3336 	}
3337 
3338 	lock_rsb(r);
3339 
3340 	attach_lkb(r, lkb);
3341 	error = do_request(r, lkb);
3342 	send_request_reply(r, lkb, error);
3343 	do_request_effects(r, lkb, error);
3344 
3345 	unlock_rsb(r);
3346 	put_rsb(r);
3347 
3348 	if (error == -EINPROGRESS)
3349 		error = 0;
3350 	if (error)
3351 		dlm_put_lkb(lkb);
3352 	return;
3353 
3354  fail:
3355 	setup_stub_lkb(ls, ms);
3356 	send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3357 }
3358 
3359 static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3360 {
3361 	struct dlm_lkb *lkb;
3362 	struct dlm_rsb *r;
3363 	int error, reply = 1;
3364 
3365 	error = find_lkb(ls, ms->m_remid, &lkb);
3366 	if (error)
3367 		goto fail;
3368 
3369 	r = lkb->lkb_resource;
3370 
3371 	hold_rsb(r);
3372 	lock_rsb(r);
3373 
3374 	error = validate_message(lkb, ms);
3375 	if (error)
3376 		goto out;
3377 
3378 	receive_flags(lkb, ms);
3379 
3380 	error = receive_convert_args(ls, lkb, ms);
3381 	if (error) {
3382 		send_convert_reply(r, lkb, error);
3383 		goto out;
3384 	}
3385 
3386 	reply = !down_conversion(lkb);
3387 
3388 	error = do_convert(r, lkb);
3389 	if (reply)
3390 		send_convert_reply(r, lkb, error);
3391 	do_convert_effects(r, lkb, error);
3392  out:
3393 	unlock_rsb(r);
3394 	put_rsb(r);
3395 	dlm_put_lkb(lkb);
3396 	return;
3397 
3398  fail:
3399 	setup_stub_lkb(ls, ms);
3400 	send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3401 }
3402 
3403 static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3404 {
3405 	struct dlm_lkb *lkb;
3406 	struct dlm_rsb *r;
3407 	int error;
3408 
3409 	error = find_lkb(ls, ms->m_remid, &lkb);
3410 	if (error)
3411 		goto fail;
3412 
3413 	r = lkb->lkb_resource;
3414 
3415 	hold_rsb(r);
3416 	lock_rsb(r);
3417 
3418 	error = validate_message(lkb, ms);
3419 	if (error)
3420 		goto out;
3421 
3422 	receive_flags(lkb, ms);
3423 
3424 	error = receive_unlock_args(ls, lkb, ms);
3425 	if (error) {
3426 		send_unlock_reply(r, lkb, error);
3427 		goto out;
3428 	}
3429 
3430 	error = do_unlock(r, lkb);
3431 	send_unlock_reply(r, lkb, error);
3432 	do_unlock_effects(r, lkb, error);
3433  out:
3434 	unlock_rsb(r);
3435 	put_rsb(r);
3436 	dlm_put_lkb(lkb);
3437 	return;
3438 
3439  fail:
3440 	setup_stub_lkb(ls, ms);
3441 	send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3442 }
3443 
3444 static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3445 {
3446 	struct dlm_lkb *lkb;
3447 	struct dlm_rsb *r;
3448 	int error;
3449 
3450 	error = find_lkb(ls, ms->m_remid, &lkb);
3451 	if (error)
3452 		goto fail;
3453 
3454 	receive_flags(lkb, ms);
3455 
3456 	r = lkb->lkb_resource;
3457 
3458 	hold_rsb(r);
3459 	lock_rsb(r);
3460 
3461 	error = validate_message(lkb, ms);
3462 	if (error)
3463 		goto out;
3464 
3465 	error = do_cancel(r, lkb);
3466 	send_cancel_reply(r, lkb, error);
3467 	do_cancel_effects(r, lkb, error);
3468  out:
3469 	unlock_rsb(r);
3470 	put_rsb(r);
3471 	dlm_put_lkb(lkb);
3472 	return;
3473 
3474  fail:
3475 	setup_stub_lkb(ls, ms);
3476 	send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3477 }
3478 
3479 static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3480 {
3481 	struct dlm_lkb *lkb;
3482 	struct dlm_rsb *r;
3483 	int error;
3484 
3485 	error = find_lkb(ls, ms->m_remid, &lkb);
3486 	if (error) {
3487 		log_debug(ls, "receive_grant from %d no lkb %x",
3488 			  ms->m_header.h_nodeid, ms->m_remid);
3489 		return;
3490 	}
3491 
3492 	r = lkb->lkb_resource;
3493 
3494 	hold_rsb(r);
3495 	lock_rsb(r);
3496 
3497 	error = validate_message(lkb, ms);
3498 	if (error)
3499 		goto out;
3500 
3501 	receive_flags_reply(lkb, ms);
3502 	if (is_altmode(lkb))
3503 		munge_altmode(lkb, ms);
3504 	grant_lock_pc(r, lkb, ms);
3505 	queue_cast(r, lkb, 0);
3506  out:
3507 	unlock_rsb(r);
3508 	put_rsb(r);
3509 	dlm_put_lkb(lkb);
3510 }
3511 
3512 static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3513 {
3514 	struct dlm_lkb *lkb;
3515 	struct dlm_rsb *r;
3516 	int error;
3517 
3518 	error = find_lkb(ls, ms->m_remid, &lkb);
3519 	if (error) {
3520 		log_debug(ls, "receive_bast from %d no lkb %x",
3521 			  ms->m_header.h_nodeid, ms->m_remid);
3522 		return;
3523 	}
3524 
3525 	r = lkb->lkb_resource;
3526 
3527 	hold_rsb(r);
3528 	lock_rsb(r);
3529 
3530 	error = validate_message(lkb, ms);
3531 	if (error)
3532 		goto out;
3533 
3534 	queue_bast(r, lkb, ms->m_bastmode);
3535  out:
3536 	unlock_rsb(r);
3537 	put_rsb(r);
3538 	dlm_put_lkb(lkb);
3539 }
3540 
3541 static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3542 {
3543 	int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3544 
3545 	from_nodeid = ms->m_header.h_nodeid;
3546 	our_nodeid = dlm_our_nodeid();
3547 
3548 	len = receive_extralen(ms);
3549 
3550 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3551 	if (dir_nodeid != our_nodeid) {
3552 		log_error(ls, "lookup dir_nodeid %d from %d",
3553 			  dir_nodeid, from_nodeid);
3554 		error = -EINVAL;
3555 		ret_nodeid = -1;
3556 		goto out;
3557 	}
3558 
3559 	error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3560 
3561 	/* Optimization: we're master so treat lookup as a request */
3562 	if (!error && ret_nodeid == our_nodeid) {
3563 		receive_request(ls, ms);
3564 		return;
3565 	}
3566  out:
3567 	send_lookup_reply(ls, ms, ret_nodeid, error);
3568 }
3569 
3570 static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3571 {
3572 	int len, dir_nodeid, from_nodeid;
3573 
3574 	from_nodeid = ms->m_header.h_nodeid;
3575 
3576 	len = receive_extralen(ms);
3577 
3578 	dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3579 	if (dir_nodeid != dlm_our_nodeid()) {
3580 		log_error(ls, "remove dir entry dir_nodeid %d from %d",
3581 			  dir_nodeid, from_nodeid);
3582 		return;
3583 	}
3584 
3585 	dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3586 }
3587 
3588 static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3589 {
3590 	do_purge(ls, ms->m_nodeid, ms->m_pid);
3591 }
3592 
3593 static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3594 {
3595 	struct dlm_lkb *lkb;
3596 	struct dlm_rsb *r;
3597 	int error, mstype, result;
3598 
3599 	error = find_lkb(ls, ms->m_remid, &lkb);
3600 	if (error) {
3601 		log_debug(ls, "receive_request_reply from %d no lkb %x",
3602 			  ms->m_header.h_nodeid, ms->m_remid);
3603 		return;
3604 	}
3605 
3606 	r = lkb->lkb_resource;
3607 	hold_rsb(r);
3608 	lock_rsb(r);
3609 
3610 	error = validate_message(lkb, ms);
3611 	if (error)
3612 		goto out;
3613 
3614 	mstype = lkb->lkb_wait_type;
3615 	error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3616 	if (error)
3617 		goto out;
3618 
3619 	/* Optimization: the dir node was also the master, so it took our
3620 	   lookup as a request and sent request reply instead of lookup reply */
3621 	if (mstype == DLM_MSG_LOOKUP) {
3622 		r->res_nodeid = ms->m_header.h_nodeid;
3623 		lkb->lkb_nodeid = r->res_nodeid;
3624 	}
3625 
3626 	/* this is the value returned from do_request() on the master */
3627 	result = ms->m_result;
3628 
3629 	switch (result) {
3630 	case -EAGAIN:
3631 		/* request would block (be queued) on remote master */
3632 		queue_cast(r, lkb, -EAGAIN);
3633 		confirm_master(r, -EAGAIN);
3634 		unhold_lkb(lkb); /* undoes create_lkb() */
3635 		break;
3636 
3637 	case -EINPROGRESS:
3638 	case 0:
3639 		/* request was queued or granted on remote master */
3640 		receive_flags_reply(lkb, ms);
3641 		lkb->lkb_remid = ms->m_lkid;
3642 		if (is_altmode(lkb))
3643 			munge_altmode(lkb, ms);
3644 		if (result) {
3645 			add_lkb(r, lkb, DLM_LKSTS_WAITING);
3646 			add_timeout(lkb);
3647 		} else {
3648 			grant_lock_pc(r, lkb, ms);
3649 			queue_cast(r, lkb, 0);
3650 		}
3651 		confirm_master(r, result);
3652 		break;
3653 
3654 	case -EBADR:
3655 	case -ENOTBLK:
3656 		/* find_rsb failed to find rsb or rsb wasn't master */
3657 		log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3658 			  lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
3659 		r->res_nodeid = -1;
3660 		lkb->lkb_nodeid = -1;
3661 
3662 		if (is_overlap(lkb)) {
3663 			/* we'll ignore error in cancel/unlock reply */
3664 			queue_cast_overlap(r, lkb);
3665 			confirm_master(r, result);
3666 			unhold_lkb(lkb); /* undoes create_lkb() */
3667 		} else
3668 			_request_lock(r, lkb);
3669 		break;
3670 
3671 	default:
3672 		log_error(ls, "receive_request_reply %x error %d",
3673 			  lkb->lkb_id, result);
3674 	}
3675 
3676 	if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3677 		log_debug(ls, "receive_request_reply %x result %d unlock",
3678 			  lkb->lkb_id, result);
3679 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3680 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3681 		send_unlock(r, lkb);
3682 	} else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3683 		log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3684 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3685 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3686 		send_cancel(r, lkb);
3687 	} else {
3688 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3689 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3690 	}
3691  out:
3692 	unlock_rsb(r);
3693 	put_rsb(r);
3694 	dlm_put_lkb(lkb);
3695 }
3696 
3697 static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3698 				    struct dlm_message *ms)
3699 {
3700 	/* this is the value returned from do_convert() on the master */
3701 	switch (ms->m_result) {
3702 	case -EAGAIN:
3703 		/* convert would block (be queued) on remote master */
3704 		queue_cast(r, lkb, -EAGAIN);
3705 		break;
3706 
3707 	case -EDEADLK:
3708 		receive_flags_reply(lkb, ms);
3709 		revert_lock_pc(r, lkb);
3710 		queue_cast(r, lkb, -EDEADLK);
3711 		break;
3712 
3713 	case -EINPROGRESS:
3714 		/* convert was queued on remote master */
3715 		receive_flags_reply(lkb, ms);
3716 		if (is_demoted(lkb))
3717 			munge_demoted(lkb);
3718 		del_lkb(r, lkb);
3719 		add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3720 		add_timeout(lkb);
3721 		break;
3722 
3723 	case 0:
3724 		/* convert was granted on remote master */
3725 		receive_flags_reply(lkb, ms);
3726 		if (is_demoted(lkb))
3727 			munge_demoted(lkb);
3728 		grant_lock_pc(r, lkb, ms);
3729 		queue_cast(r, lkb, 0);
3730 		break;
3731 
3732 	default:
3733 		log_error(r->res_ls, "receive_convert_reply %x error %d",
3734 			  lkb->lkb_id, ms->m_result);
3735 	}
3736 }
3737 
3738 static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3739 {
3740 	struct dlm_rsb *r = lkb->lkb_resource;
3741 	int error;
3742 
3743 	hold_rsb(r);
3744 	lock_rsb(r);
3745 
3746 	error = validate_message(lkb, ms);
3747 	if (error)
3748 		goto out;
3749 
3750 	/* stub reply can happen with waiters_mutex held */
3751 	error = remove_from_waiters_ms(lkb, ms);
3752 	if (error)
3753 		goto out;
3754 
3755 	__receive_convert_reply(r, lkb, ms);
3756  out:
3757 	unlock_rsb(r);
3758 	put_rsb(r);
3759 }
3760 
3761 static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3762 {
3763 	struct dlm_lkb *lkb;
3764 	int error;
3765 
3766 	error = find_lkb(ls, ms->m_remid, &lkb);
3767 	if (error) {
3768 		log_debug(ls, "receive_convert_reply from %d no lkb %x",
3769 			  ms->m_header.h_nodeid, ms->m_remid);
3770 		return;
3771 	}
3772 
3773 	_receive_convert_reply(lkb, ms);
3774 	dlm_put_lkb(lkb);
3775 }
3776 
3777 static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3778 {
3779 	struct dlm_rsb *r = lkb->lkb_resource;
3780 	int error;
3781 
3782 	hold_rsb(r);
3783 	lock_rsb(r);
3784 
3785 	error = validate_message(lkb, ms);
3786 	if (error)
3787 		goto out;
3788 
3789 	/* stub reply can happen with waiters_mutex held */
3790 	error = remove_from_waiters_ms(lkb, ms);
3791 	if (error)
3792 		goto out;
3793 
3794 	/* this is the value returned from do_unlock() on the master */
3795 
3796 	switch (ms->m_result) {
3797 	case -DLM_EUNLOCK:
3798 		receive_flags_reply(lkb, ms);
3799 		remove_lock_pc(r, lkb);
3800 		queue_cast(r, lkb, -DLM_EUNLOCK);
3801 		break;
3802 	case -ENOENT:
3803 		break;
3804 	default:
3805 		log_error(r->res_ls, "receive_unlock_reply %x error %d",
3806 			  lkb->lkb_id, ms->m_result);
3807 	}
3808  out:
3809 	unlock_rsb(r);
3810 	put_rsb(r);
3811 }
3812 
3813 static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3814 {
3815 	struct dlm_lkb *lkb;
3816 	int error;
3817 
3818 	error = find_lkb(ls, ms->m_remid, &lkb);
3819 	if (error) {
3820 		log_debug(ls, "receive_unlock_reply from %d no lkb %x",
3821 			  ms->m_header.h_nodeid, ms->m_remid);
3822 		return;
3823 	}
3824 
3825 	_receive_unlock_reply(lkb, ms);
3826 	dlm_put_lkb(lkb);
3827 }
3828 
3829 static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3830 {
3831 	struct dlm_rsb *r = lkb->lkb_resource;
3832 	int error;
3833 
3834 	hold_rsb(r);
3835 	lock_rsb(r);
3836 
3837 	error = validate_message(lkb, ms);
3838 	if (error)
3839 		goto out;
3840 
3841 	/* stub reply can happen with waiters_mutex held */
3842 	error = remove_from_waiters_ms(lkb, ms);
3843 	if (error)
3844 		goto out;
3845 
3846 	/* this is the value returned from do_cancel() on the master */
3847 
3848 	switch (ms->m_result) {
3849 	case -DLM_ECANCEL:
3850 		receive_flags_reply(lkb, ms);
3851 		revert_lock_pc(r, lkb);
3852 		queue_cast(r, lkb, -DLM_ECANCEL);
3853 		break;
3854 	case 0:
3855 		break;
3856 	default:
3857 		log_error(r->res_ls, "receive_cancel_reply %x error %d",
3858 			  lkb->lkb_id, ms->m_result);
3859 	}
3860  out:
3861 	unlock_rsb(r);
3862 	put_rsb(r);
3863 }
3864 
3865 static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3866 {
3867 	struct dlm_lkb *lkb;
3868 	int error;
3869 
3870 	error = find_lkb(ls, ms->m_remid, &lkb);
3871 	if (error) {
3872 		log_debug(ls, "receive_cancel_reply from %d no lkb %x",
3873 			  ms->m_header.h_nodeid, ms->m_remid);
3874 		return;
3875 	}
3876 
3877 	_receive_cancel_reply(lkb, ms);
3878 	dlm_put_lkb(lkb);
3879 }
3880 
3881 static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3882 {
3883 	struct dlm_lkb *lkb;
3884 	struct dlm_rsb *r;
3885 	int error, ret_nodeid;
3886 
3887 	error = find_lkb(ls, ms->m_lkid, &lkb);
3888 	if (error) {
3889 		log_error(ls, "receive_lookup_reply no lkb");
3890 		return;
3891 	}
3892 
3893 	/* ms->m_result is the value returned by dlm_dir_lookup on dir node
3894 	   FIXME: will a non-zero error ever be returned? */
3895 
3896 	r = lkb->lkb_resource;
3897 	hold_rsb(r);
3898 	lock_rsb(r);
3899 
3900 	error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3901 	if (error)
3902 		goto out;
3903 
3904 	ret_nodeid = ms->m_nodeid;
3905 	if (ret_nodeid == dlm_our_nodeid()) {
3906 		r->res_nodeid = 0;
3907 		ret_nodeid = 0;
3908 		r->res_first_lkid = 0;
3909 	} else {
3910 		/* set_master() will copy res_nodeid to lkb_nodeid */
3911 		r->res_nodeid = ret_nodeid;
3912 	}
3913 
3914 	if (is_overlap(lkb)) {
3915 		log_debug(ls, "receive_lookup_reply %x unlock %x",
3916 			  lkb->lkb_id, lkb->lkb_flags);
3917 		queue_cast_overlap(r, lkb);
3918 		unhold_lkb(lkb); /* undoes create_lkb() */
3919 		goto out_list;
3920 	}
3921 
3922 	_request_lock(r, lkb);
3923 
3924  out_list:
3925 	if (!ret_nodeid)
3926 		process_lookup_list(r);
3927  out:
3928 	unlock_rsb(r);
3929 	put_rsb(r);
3930 	dlm_put_lkb(lkb);
3931 }
3932 
3933 static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms)
3934 {
3935 	if (!dlm_is_member(ls, ms->m_header.h_nodeid)) {
3936 		log_debug(ls, "ignore non-member message %d from %d %x %x %d",
3937 			  ms->m_type, ms->m_header.h_nodeid, ms->m_lkid,
3938 			  ms->m_remid, ms->m_result);
3939 		return;
3940 	}
3941 
3942 	switch (ms->m_type) {
3943 
3944 	/* messages sent to a master node */
3945 
3946 	case DLM_MSG_REQUEST:
3947 		receive_request(ls, ms);
3948 		break;
3949 
3950 	case DLM_MSG_CONVERT:
3951 		receive_convert(ls, ms);
3952 		break;
3953 
3954 	case DLM_MSG_UNLOCK:
3955 		receive_unlock(ls, ms);
3956 		break;
3957 
3958 	case DLM_MSG_CANCEL:
3959 		receive_cancel(ls, ms);
3960 		break;
3961 
3962 	/* messages sent from a master node (replies to above) */
3963 
3964 	case DLM_MSG_REQUEST_REPLY:
3965 		receive_request_reply(ls, ms);
3966 		break;
3967 
3968 	case DLM_MSG_CONVERT_REPLY:
3969 		receive_convert_reply(ls, ms);
3970 		break;
3971 
3972 	case DLM_MSG_UNLOCK_REPLY:
3973 		receive_unlock_reply(ls, ms);
3974 		break;
3975 
3976 	case DLM_MSG_CANCEL_REPLY:
3977 		receive_cancel_reply(ls, ms);
3978 		break;
3979 
3980 	/* messages sent from a master node (only two types of async msg) */
3981 
3982 	case DLM_MSG_GRANT:
3983 		receive_grant(ls, ms);
3984 		break;
3985 
3986 	case DLM_MSG_BAST:
3987 		receive_bast(ls, ms);
3988 		break;
3989 
3990 	/* messages sent to a dir node */
3991 
3992 	case DLM_MSG_LOOKUP:
3993 		receive_lookup(ls, ms);
3994 		break;
3995 
3996 	case DLM_MSG_REMOVE:
3997 		receive_remove(ls, ms);
3998 		break;
3999 
4000 	/* messages sent from a dir node (remove has no reply) */
4001 
4002 	case DLM_MSG_LOOKUP_REPLY:
4003 		receive_lookup_reply(ls, ms);
4004 		break;
4005 
4006 	/* other messages */
4007 
4008 	case DLM_MSG_PURGE:
4009 		receive_purge(ls, ms);
4010 		break;
4011 
4012 	default:
4013 		log_error(ls, "unknown message type %d", ms->m_type);
4014 	}
4015 }
4016 
4017 /* If the lockspace is in recovery mode (locking stopped), then normal
4018    messages are saved on the requestqueue for processing after recovery is
4019    done.  When not in recovery mode, we wait for dlm_recoverd to drain saved
4020    messages off the requestqueue before we process new ones. This occurs right
4021    after recovery completes when we transition from saving all messages on
4022    requestqueue, to processing all the saved messages, to processing new
4023    messages as they arrive. */
4024 
4025 static void dlm_receive_message(struct dlm_ls *ls, struct dlm_message *ms,
4026 				int nodeid)
4027 {
4028 	if (dlm_locking_stopped(ls)) {
4029 		dlm_add_requestqueue(ls, nodeid, ms);
4030 	} else {
4031 		dlm_wait_requestqueue(ls);
4032 		_receive_message(ls, ms);
4033 	}
4034 }
4035 
4036 /* This is called by dlm_recoverd to process messages that were saved on
4037    the requestqueue. */
4038 
4039 void dlm_receive_message_saved(struct dlm_ls *ls, struct dlm_message *ms)
4040 {
4041 	_receive_message(ls, ms);
4042 }
4043 
4044 /* This is called by the midcomms layer when something is received for
4045    the lockspace.  It could be either a MSG (normal message sent as part of
4046    standard locking activity) or an RCOM (recovery message sent as part of
4047    lockspace recovery). */
4048 
4049 void dlm_receive_buffer(union dlm_packet *p, int nodeid)
4050 {
4051 	struct dlm_header *hd = &p->header;
4052 	struct dlm_ls *ls;
4053 	int type = 0;
4054 
4055 	switch (hd->h_cmd) {
4056 	case DLM_MSG:
4057 		dlm_message_in(&p->message);
4058 		type = p->message.m_type;
4059 		break;
4060 	case DLM_RCOM:
4061 		dlm_rcom_in(&p->rcom);
4062 		type = p->rcom.rc_type;
4063 		break;
4064 	default:
4065 		log_print("invalid h_cmd %d from %u", hd->h_cmd, nodeid);
4066 		return;
4067 	}
4068 
4069 	if (hd->h_nodeid != nodeid) {
4070 		log_print("invalid h_nodeid %d from %d lockspace %x",
4071 			  hd->h_nodeid, nodeid, hd->h_lockspace);
4072 		return;
4073 	}
4074 
4075 	ls = dlm_find_lockspace_global(hd->h_lockspace);
4076 	if (!ls) {
4077 		if (dlm_config.ci_log_debug)
4078 			log_print("invalid lockspace %x from %d cmd %d type %d",
4079 				  hd->h_lockspace, nodeid, hd->h_cmd, type);
4080 
4081 		if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
4082 			dlm_send_ls_not_ready(nodeid, &p->rcom);
4083 		return;
4084 	}
4085 
4086 	/* this rwsem allows dlm_ls_stop() to wait for all dlm_recv threads to
4087 	   be inactive (in this ls) before transitioning to recovery mode */
4088 
4089 	down_read(&ls->ls_recv_active);
4090 	if (hd->h_cmd == DLM_MSG)
4091 		dlm_receive_message(ls, &p->message, nodeid);
4092 	else
4093 		dlm_receive_rcom(ls, &p->rcom, nodeid);
4094 	up_read(&ls->ls_recv_active);
4095 
4096 	dlm_put_lockspace(ls);
4097 }
4098 
4099 static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
4100 				   struct dlm_message *ms_stub)
4101 {
4102 	if (middle_conversion(lkb)) {
4103 		hold_lkb(lkb);
4104 		memset(ms_stub, 0, sizeof(struct dlm_message));
4105 		ms_stub->m_flags = DLM_IFL_STUB_MS;
4106 		ms_stub->m_type = DLM_MSG_CONVERT_REPLY;
4107 		ms_stub->m_result = -EINPROGRESS;
4108 		ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4109 		_receive_convert_reply(lkb, ms_stub);
4110 
4111 		/* Same special case as in receive_rcom_lock_args() */
4112 		lkb->lkb_grmode = DLM_LOCK_IV;
4113 		rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
4114 		unhold_lkb(lkb);
4115 
4116 	} else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
4117 		lkb->lkb_flags |= DLM_IFL_RESEND;
4118 	}
4119 
4120 	/* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
4121 	   conversions are async; there's no reply from the remote master */
4122 }
4123 
4124 /* A waiting lkb needs recovery if the master node has failed, or
4125    the master node is changing (only when no directory is used) */
4126 
4127 static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
4128 {
4129 	if (dlm_is_removed(ls, lkb->lkb_nodeid))
4130 		return 1;
4131 
4132 	if (!dlm_no_directory(ls))
4133 		return 0;
4134 
4135 	if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
4136 		return 1;
4137 
4138 	return 0;
4139 }
4140 
4141 /* Recovery for locks that are waiting for replies from nodes that are now
4142    gone.  We can just complete unlocks and cancels by faking a reply from the
4143    dead node.  Requests and up-conversions we flag to be resent after
4144    recovery.  Down-conversions can just be completed with a fake reply like
4145    unlocks.  Conversions between PR and CW need special attention. */
4146 
4147 void dlm_recover_waiters_pre(struct dlm_ls *ls)
4148 {
4149 	struct dlm_lkb *lkb, *safe;
4150 	struct dlm_message *ms_stub;
4151 	int wait_type, stub_unlock_result, stub_cancel_result;
4152 
4153 	ms_stub = kmalloc(sizeof(struct dlm_message), GFP_KERNEL);
4154 	if (!ms_stub) {
4155 		log_error(ls, "dlm_recover_waiters_pre no mem");
4156 		return;
4157 	}
4158 
4159 	mutex_lock(&ls->ls_waiters_mutex);
4160 
4161 	list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
4162 
4163 		/* exclude debug messages about unlocks because there can be so
4164 		   many and they aren't very interesting */
4165 
4166 		if (lkb->lkb_wait_type != DLM_MSG_UNLOCK) {
4167 			log_debug(ls, "recover_waiter %x nodeid %d "
4168 				  "msg %d to %d", lkb->lkb_id, lkb->lkb_nodeid,
4169 				  lkb->lkb_wait_type, lkb->lkb_wait_nodeid);
4170 		}
4171 
4172 		/* all outstanding lookups, regardless of destination  will be
4173 		   resent after recovery is done */
4174 
4175 		if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
4176 			lkb->lkb_flags |= DLM_IFL_RESEND;
4177 			continue;
4178 		}
4179 
4180 		if (!waiter_needs_recovery(ls, lkb))
4181 			continue;
4182 
4183 		wait_type = lkb->lkb_wait_type;
4184 		stub_unlock_result = -DLM_EUNLOCK;
4185 		stub_cancel_result = -DLM_ECANCEL;
4186 
4187 		/* Main reply may have been received leaving a zero wait_type,
4188 		   but a reply for the overlapping op may not have been
4189 		   received.  In that case we need to fake the appropriate
4190 		   reply for the overlap op. */
4191 
4192 		if (!wait_type) {
4193 			if (is_overlap_cancel(lkb)) {
4194 				wait_type = DLM_MSG_CANCEL;
4195 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4196 					stub_cancel_result = 0;
4197 			}
4198 			if (is_overlap_unlock(lkb)) {
4199 				wait_type = DLM_MSG_UNLOCK;
4200 				if (lkb->lkb_grmode == DLM_LOCK_IV)
4201 					stub_unlock_result = -ENOENT;
4202 			}
4203 
4204 			log_debug(ls, "rwpre overlap %x %x %d %d %d",
4205 				  lkb->lkb_id, lkb->lkb_flags, wait_type,
4206 				  stub_cancel_result, stub_unlock_result);
4207 		}
4208 
4209 		switch (wait_type) {
4210 
4211 		case DLM_MSG_REQUEST:
4212 			lkb->lkb_flags |= DLM_IFL_RESEND;
4213 			break;
4214 
4215 		case DLM_MSG_CONVERT:
4216 			recover_convert_waiter(ls, lkb, ms_stub);
4217 			break;
4218 
4219 		case DLM_MSG_UNLOCK:
4220 			hold_lkb(lkb);
4221 			memset(ms_stub, 0, sizeof(struct dlm_message));
4222 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4223 			ms_stub->m_type = DLM_MSG_UNLOCK_REPLY;
4224 			ms_stub->m_result = stub_unlock_result;
4225 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4226 			_receive_unlock_reply(lkb, ms_stub);
4227 			dlm_put_lkb(lkb);
4228 			break;
4229 
4230 		case DLM_MSG_CANCEL:
4231 			hold_lkb(lkb);
4232 			memset(ms_stub, 0, sizeof(struct dlm_message));
4233 			ms_stub->m_flags = DLM_IFL_STUB_MS;
4234 			ms_stub->m_type = DLM_MSG_CANCEL_REPLY;
4235 			ms_stub->m_result = stub_cancel_result;
4236 			ms_stub->m_header.h_nodeid = lkb->lkb_nodeid;
4237 			_receive_cancel_reply(lkb, ms_stub);
4238 			dlm_put_lkb(lkb);
4239 			break;
4240 
4241 		default:
4242 			log_error(ls, "invalid lkb wait_type %d %d",
4243 				  lkb->lkb_wait_type, wait_type);
4244 		}
4245 		schedule();
4246 	}
4247 	mutex_unlock(&ls->ls_waiters_mutex);
4248 	kfree(ms_stub);
4249 }
4250 
4251 static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
4252 {
4253 	struct dlm_lkb *lkb;
4254 	int found = 0;
4255 
4256 	mutex_lock(&ls->ls_waiters_mutex);
4257 	list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
4258 		if (lkb->lkb_flags & DLM_IFL_RESEND) {
4259 			hold_lkb(lkb);
4260 			found = 1;
4261 			break;
4262 		}
4263 	}
4264 	mutex_unlock(&ls->ls_waiters_mutex);
4265 
4266 	if (!found)
4267 		lkb = NULL;
4268 	return lkb;
4269 }
4270 
4271 /* Deal with lookups and lkb's marked RESEND from _pre.  We may now be the
4272    master or dir-node for r.  Processing the lkb may result in it being placed
4273    back on waiters. */
4274 
4275 /* We do this after normal locking has been enabled and any saved messages
4276    (in requestqueue) have been processed.  We should be confident that at
4277    this point we won't get or process a reply to any of these waiting
4278    operations.  But, new ops may be coming in on the rsbs/locks here from
4279    userspace or remotely. */
4280 
4281 /* there may have been an overlap unlock/cancel prior to recovery or after
4282    recovery.  if before, the lkb may still have a pos wait_count; if after, the
4283    overlap flag would just have been set and nothing new sent.  we can be
4284    confident here than any replies to either the initial op or overlap ops
4285    prior to recovery have been received. */
4286 
4287 int dlm_recover_waiters_post(struct dlm_ls *ls)
4288 {
4289 	struct dlm_lkb *lkb;
4290 	struct dlm_rsb *r;
4291 	int error = 0, mstype, err, oc, ou;
4292 
4293 	while (1) {
4294 		if (dlm_locking_stopped(ls)) {
4295 			log_debug(ls, "recover_waiters_post aborted");
4296 			error = -EINTR;
4297 			break;
4298 		}
4299 
4300 		lkb = find_resend_waiter(ls);
4301 		if (!lkb)
4302 			break;
4303 
4304 		r = lkb->lkb_resource;
4305 		hold_rsb(r);
4306 		lock_rsb(r);
4307 
4308 		mstype = lkb->lkb_wait_type;
4309 		oc = is_overlap_cancel(lkb);
4310 		ou = is_overlap_unlock(lkb);
4311 		err = 0;
4312 
4313 		log_debug(ls, "recover_waiter %x nodeid %d msg %d r_nodeid %d",
4314 			  lkb->lkb_id, lkb->lkb_nodeid, mstype, r->res_nodeid);
4315 
4316 		/* At this point we assume that we won't get a reply to any
4317 		   previous op or overlap op on this lock.  First, do a big
4318 		   remove_from_waiters() for all previous ops. */
4319 
4320 		lkb->lkb_flags &= ~DLM_IFL_RESEND;
4321 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
4322 		lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
4323 		lkb->lkb_wait_type = 0;
4324 		lkb->lkb_wait_count = 0;
4325 		mutex_lock(&ls->ls_waiters_mutex);
4326 		list_del_init(&lkb->lkb_wait_reply);
4327 		mutex_unlock(&ls->ls_waiters_mutex);
4328 		unhold_lkb(lkb); /* for waiters list */
4329 
4330 		if (oc || ou) {
4331 			/* do an unlock or cancel instead of resending */
4332 			switch (mstype) {
4333 			case DLM_MSG_LOOKUP:
4334 			case DLM_MSG_REQUEST:
4335 				queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
4336 							-DLM_ECANCEL);
4337 				unhold_lkb(lkb); /* undoes create_lkb() */
4338 				break;
4339 			case DLM_MSG_CONVERT:
4340 				if (oc) {
4341 					queue_cast(r, lkb, -DLM_ECANCEL);
4342 				} else {
4343 					lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
4344 					_unlock_lock(r, lkb);
4345 				}
4346 				break;
4347 			default:
4348 				err = 1;
4349 			}
4350 		} else {
4351 			switch (mstype) {
4352 			case DLM_MSG_LOOKUP:
4353 			case DLM_MSG_REQUEST:
4354 				_request_lock(r, lkb);
4355 				if (is_master(r))
4356 					confirm_master(r, 0);
4357 				break;
4358 			case DLM_MSG_CONVERT:
4359 				_convert_lock(r, lkb);
4360 				break;
4361 			default:
4362 				err = 1;
4363 			}
4364 		}
4365 
4366 		if (err)
4367 			log_error(ls, "recover_waiters_post %x %d %x %d %d",
4368 			  	  lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
4369 		unlock_rsb(r);
4370 		put_rsb(r);
4371 		dlm_put_lkb(lkb);
4372 	}
4373 
4374 	return error;
4375 }
4376 
4377 static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
4378 			int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
4379 {
4380 	struct dlm_ls *ls = r->res_ls;
4381 	struct dlm_lkb *lkb, *safe;
4382 
4383 	list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
4384 		if (test(ls, lkb)) {
4385 			rsb_set_flag(r, RSB_LOCKS_PURGED);
4386 			del_lkb(r, lkb);
4387 			/* this put should free the lkb */
4388 			if (!dlm_put_lkb(lkb))
4389 				log_error(ls, "purged lkb not released");
4390 		}
4391 	}
4392 }
4393 
4394 static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4395 {
4396 	return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
4397 }
4398 
4399 static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
4400 {
4401 	return is_master_copy(lkb);
4402 }
4403 
4404 static void purge_dead_locks(struct dlm_rsb *r)
4405 {
4406 	purge_queue(r, &r->res_grantqueue, &purge_dead_test);
4407 	purge_queue(r, &r->res_convertqueue, &purge_dead_test);
4408 	purge_queue(r, &r->res_waitqueue, &purge_dead_test);
4409 }
4410 
4411 void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
4412 {
4413 	purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
4414 	purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
4415 	purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
4416 }
4417 
4418 /* Get rid of locks held by nodes that are gone. */
4419 
4420 int dlm_purge_locks(struct dlm_ls *ls)
4421 {
4422 	struct dlm_rsb *r;
4423 
4424 	log_debug(ls, "dlm_purge_locks");
4425 
4426 	down_write(&ls->ls_root_sem);
4427 	list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4428 		hold_rsb(r);
4429 		lock_rsb(r);
4430 		if (is_master(r))
4431 			purge_dead_locks(r);
4432 		unlock_rsb(r);
4433 		unhold_rsb(r);
4434 
4435 		schedule();
4436 	}
4437 	up_write(&ls->ls_root_sem);
4438 
4439 	return 0;
4440 }
4441 
4442 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4443 {
4444 	struct dlm_rsb *r, *r_ret = NULL;
4445 
4446 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
4447 	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4448 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
4449 			continue;
4450 		hold_rsb(r);
4451 		rsb_clear_flag(r, RSB_LOCKS_PURGED);
4452 		r_ret = r;
4453 		break;
4454 	}
4455 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
4456 	return r_ret;
4457 }
4458 
4459 void dlm_grant_after_purge(struct dlm_ls *ls)
4460 {
4461 	struct dlm_rsb *r;
4462 	int bucket = 0;
4463 
4464 	while (1) {
4465 		r = find_purged_rsb(ls, bucket);
4466 		if (!r) {
4467 			if (bucket == ls->ls_rsbtbl_size - 1)
4468 				break;
4469 			bucket++;
4470 			continue;
4471 		}
4472 		lock_rsb(r);
4473 		if (is_master(r)) {
4474 			grant_pending_locks(r);
4475 			confirm_master(r, 0);
4476 		}
4477 		unlock_rsb(r);
4478 		put_rsb(r);
4479 		schedule();
4480 	}
4481 }
4482 
4483 static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4484 					 uint32_t remid)
4485 {
4486 	struct dlm_lkb *lkb;
4487 
4488 	list_for_each_entry(lkb, head, lkb_statequeue) {
4489 		if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4490 			return lkb;
4491 	}
4492 	return NULL;
4493 }
4494 
4495 static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4496 				    uint32_t remid)
4497 {
4498 	struct dlm_lkb *lkb;
4499 
4500 	lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4501 	if (lkb)
4502 		return lkb;
4503 	lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4504 	if (lkb)
4505 		return lkb;
4506 	lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4507 	if (lkb)
4508 		return lkb;
4509 	return NULL;
4510 }
4511 
4512 /* needs at least dlm_rcom + rcom_lock */
4513 static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4514 				  struct dlm_rsb *r, struct dlm_rcom *rc)
4515 {
4516 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4517 
4518 	lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4519 	lkb->lkb_ownpid = le32_to_cpu(rl->rl_ownpid);
4520 	lkb->lkb_remid = le32_to_cpu(rl->rl_lkid);
4521 	lkb->lkb_exflags = le32_to_cpu(rl->rl_exflags);
4522 	lkb->lkb_flags = le32_to_cpu(rl->rl_flags) & 0x0000FFFF;
4523 	lkb->lkb_flags |= DLM_IFL_MSTCPY;
4524 	lkb->lkb_lvbseq = le32_to_cpu(rl->rl_lvbseq);
4525 	lkb->lkb_rqmode = rl->rl_rqmode;
4526 	lkb->lkb_grmode = rl->rl_grmode;
4527 	/* don't set lkb_status because add_lkb wants to itself */
4528 
4529 	lkb->lkb_bastfn = (rl->rl_asts & DLM_CB_BAST) ? &fake_bastfn : NULL;
4530 	lkb->lkb_astfn = (rl->rl_asts & DLM_CB_CAST) ? &fake_astfn : NULL;
4531 
4532 	if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4533 		int lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4534 			 sizeof(struct rcom_lock);
4535 		if (lvblen > ls->ls_lvblen)
4536 			return -EINVAL;
4537 		lkb->lkb_lvbptr = dlm_allocate_lvb(ls);
4538 		if (!lkb->lkb_lvbptr)
4539 			return -ENOMEM;
4540 		memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4541 	}
4542 
4543 	/* Conversions between PR and CW (middle modes) need special handling.
4544 	   The real granted mode of these converting locks cannot be determined
4545 	   until all locks have been rebuilt on the rsb (recover_conversion) */
4546 
4547 	if (rl->rl_wait_type == cpu_to_le16(DLM_MSG_CONVERT) &&
4548 	    middle_conversion(lkb)) {
4549 		rl->rl_status = DLM_LKSTS_CONVERT;
4550 		lkb->lkb_grmode = DLM_LOCK_IV;
4551 		rsb_set_flag(r, RSB_RECOVER_CONVERT);
4552 	}
4553 
4554 	return 0;
4555 }
4556 
4557 /* This lkb may have been recovered in a previous aborted recovery so we need
4558    to check if the rsb already has an lkb with the given remote nodeid/lkid.
4559    If so we just send back a standard reply.  If not, we create a new lkb with
4560    the given values and send back our lkid.  We send back our lkid by sending
4561    back the rcom_lock struct we got but with the remid field filled in. */
4562 
4563 /* needs at least dlm_rcom + rcom_lock */
4564 int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4565 {
4566 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4567 	struct dlm_rsb *r;
4568 	struct dlm_lkb *lkb;
4569 	int error;
4570 
4571 	if (rl->rl_parent_lkid) {
4572 		error = -EOPNOTSUPP;
4573 		goto out;
4574 	}
4575 
4576 	error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen),
4577 			 R_MASTER, &r);
4578 	if (error)
4579 		goto out;
4580 
4581 	lock_rsb(r);
4582 
4583 	lkb = search_remid(r, rc->rc_header.h_nodeid, le32_to_cpu(rl->rl_lkid));
4584 	if (lkb) {
4585 		error = -EEXIST;
4586 		goto out_remid;
4587 	}
4588 
4589 	error = create_lkb(ls, &lkb);
4590 	if (error)
4591 		goto out_unlock;
4592 
4593 	error = receive_rcom_lock_args(ls, lkb, r, rc);
4594 	if (error) {
4595 		__put_lkb(ls, lkb);
4596 		goto out_unlock;
4597 	}
4598 
4599 	attach_lkb(r, lkb);
4600 	add_lkb(r, lkb, rl->rl_status);
4601 	error = 0;
4602 
4603  out_remid:
4604 	/* this is the new value returned to the lock holder for
4605 	   saving in its process-copy lkb */
4606 	rl->rl_remid = cpu_to_le32(lkb->lkb_id);
4607 
4608  out_unlock:
4609 	unlock_rsb(r);
4610 	put_rsb(r);
4611  out:
4612 	if (error)
4613 		log_debug(ls, "recover_master_copy %d %x", error,
4614 			  le32_to_cpu(rl->rl_lkid));
4615 	rl->rl_result = cpu_to_le32(error);
4616 	return error;
4617 }
4618 
4619 /* needs at least dlm_rcom + rcom_lock */
4620 int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4621 {
4622 	struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4623 	struct dlm_rsb *r;
4624 	struct dlm_lkb *lkb;
4625 	int error;
4626 
4627 	error = find_lkb(ls, le32_to_cpu(rl->rl_lkid), &lkb);
4628 	if (error) {
4629 		log_error(ls, "recover_process_copy no lkid %x",
4630 				le32_to_cpu(rl->rl_lkid));
4631 		return error;
4632 	}
4633 
4634 	DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4635 
4636 	error = le32_to_cpu(rl->rl_result);
4637 
4638 	r = lkb->lkb_resource;
4639 	hold_rsb(r);
4640 	lock_rsb(r);
4641 
4642 	switch (error) {
4643 	case -EBADR:
4644 		/* There's a chance the new master received our lock before
4645 		   dlm_recover_master_reply(), this wouldn't happen if we did
4646 		   a barrier between recover_masters and recover_locks. */
4647 		log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4648 			  (unsigned long)r, r->res_name);
4649 		dlm_send_rcom_lock(r, lkb);
4650 		goto out;
4651 	case -EEXIST:
4652 		log_debug(ls, "master copy exists %x", lkb->lkb_id);
4653 		/* fall through */
4654 	case 0:
4655 		lkb->lkb_remid = le32_to_cpu(rl->rl_remid);
4656 		break;
4657 	default:
4658 		log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4659 			  error, lkb->lkb_id);
4660 	}
4661 
4662 	/* an ack for dlm_recover_locks() which waits for replies from
4663 	   all the locks it sends to new masters */
4664 	dlm_recovered_lock(r);
4665  out:
4666 	unlock_rsb(r);
4667 	put_rsb(r);
4668 	dlm_put_lkb(lkb);
4669 
4670 	return 0;
4671 }
4672 
4673 int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4674 		     int mode, uint32_t flags, void *name, unsigned int namelen,
4675 		     unsigned long timeout_cs)
4676 {
4677 	struct dlm_lkb *lkb;
4678 	struct dlm_args args;
4679 	int error;
4680 
4681 	dlm_lock_recovery(ls);
4682 
4683 	error = create_lkb(ls, &lkb);
4684 	if (error) {
4685 		kfree(ua);
4686 		goto out;
4687 	}
4688 
4689 	if (flags & DLM_LKF_VALBLK) {
4690 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4691 		if (!ua->lksb.sb_lvbptr) {
4692 			kfree(ua);
4693 			__put_lkb(ls, lkb);
4694 			error = -ENOMEM;
4695 			goto out;
4696 		}
4697 	}
4698 
4699 	/* After ua is attached to lkb it will be freed by dlm_free_lkb().
4700 	   When DLM_IFL_USER is set, the dlm knows that this is a userspace
4701 	   lock and that lkb_astparam is the dlm_user_args structure. */
4702 
4703 	error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
4704 			      fake_astfn, ua, fake_bastfn, &args);
4705 	lkb->lkb_flags |= DLM_IFL_USER;
4706 
4707 	if (error) {
4708 		__put_lkb(ls, lkb);
4709 		goto out;
4710 	}
4711 
4712 	error = request_lock(ls, lkb, name, namelen, &args);
4713 
4714 	switch (error) {
4715 	case 0:
4716 		break;
4717 	case -EINPROGRESS:
4718 		error = 0;
4719 		break;
4720 	case -EAGAIN:
4721 		error = 0;
4722 		/* fall through */
4723 	default:
4724 		__put_lkb(ls, lkb);
4725 		goto out;
4726 	}
4727 
4728 	/* add this new lkb to the per-process list of locks */
4729 	spin_lock(&ua->proc->locks_spin);
4730 	hold_lkb(lkb);
4731 	list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4732 	spin_unlock(&ua->proc->locks_spin);
4733  out:
4734 	dlm_unlock_recovery(ls);
4735 	return error;
4736 }
4737 
4738 int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4739 		     int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4740 		     unsigned long timeout_cs)
4741 {
4742 	struct dlm_lkb *lkb;
4743 	struct dlm_args args;
4744 	struct dlm_user_args *ua;
4745 	int error;
4746 
4747 	dlm_lock_recovery(ls);
4748 
4749 	error = find_lkb(ls, lkid, &lkb);
4750 	if (error)
4751 		goto out;
4752 
4753 	/* user can change the params on its lock when it converts it, or
4754 	   add an lvb that didn't exist before */
4755 
4756 	ua = lkb->lkb_ua;
4757 
4758 	if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
4759 		ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_NOFS);
4760 		if (!ua->lksb.sb_lvbptr) {
4761 			error = -ENOMEM;
4762 			goto out_put;
4763 		}
4764 	}
4765 	if (lvb_in && ua->lksb.sb_lvbptr)
4766 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4767 
4768 	ua->xid = ua_tmp->xid;
4769 	ua->castparam = ua_tmp->castparam;
4770 	ua->castaddr = ua_tmp->castaddr;
4771 	ua->bastparam = ua_tmp->bastparam;
4772 	ua->bastaddr = ua_tmp->bastaddr;
4773 	ua->user_lksb = ua_tmp->user_lksb;
4774 
4775 	error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4776 			      fake_astfn, ua, fake_bastfn, &args);
4777 	if (error)
4778 		goto out_put;
4779 
4780 	error = convert_lock(ls, lkb, &args);
4781 
4782 	if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
4783 		error = 0;
4784  out_put:
4785 	dlm_put_lkb(lkb);
4786  out:
4787 	dlm_unlock_recovery(ls);
4788 	kfree(ua_tmp);
4789 	return error;
4790 }
4791 
4792 int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4793 		    uint32_t flags, uint32_t lkid, char *lvb_in)
4794 {
4795 	struct dlm_lkb *lkb;
4796 	struct dlm_args args;
4797 	struct dlm_user_args *ua;
4798 	int error;
4799 
4800 	dlm_lock_recovery(ls);
4801 
4802 	error = find_lkb(ls, lkid, &lkb);
4803 	if (error)
4804 		goto out;
4805 
4806 	ua = lkb->lkb_ua;
4807 
4808 	if (lvb_in && ua->lksb.sb_lvbptr)
4809 		memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4810 	if (ua_tmp->castparam)
4811 		ua->castparam = ua_tmp->castparam;
4812 	ua->user_lksb = ua_tmp->user_lksb;
4813 
4814 	error = set_unlock_args(flags, ua, &args);
4815 	if (error)
4816 		goto out_put;
4817 
4818 	error = unlock_lock(ls, lkb, &args);
4819 
4820 	if (error == -DLM_EUNLOCK)
4821 		error = 0;
4822 	/* from validate_unlock_args() */
4823 	if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4824 		error = 0;
4825 	if (error)
4826 		goto out_put;
4827 
4828 	spin_lock(&ua->proc->locks_spin);
4829 	/* dlm_user_add_cb() may have already taken lkb off the proc list */
4830 	if (!list_empty(&lkb->lkb_ownqueue))
4831 		list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
4832 	spin_unlock(&ua->proc->locks_spin);
4833  out_put:
4834 	dlm_put_lkb(lkb);
4835  out:
4836 	dlm_unlock_recovery(ls);
4837 	kfree(ua_tmp);
4838 	return error;
4839 }
4840 
4841 int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4842 		    uint32_t flags, uint32_t lkid)
4843 {
4844 	struct dlm_lkb *lkb;
4845 	struct dlm_args args;
4846 	struct dlm_user_args *ua;
4847 	int error;
4848 
4849 	dlm_lock_recovery(ls);
4850 
4851 	error = find_lkb(ls, lkid, &lkb);
4852 	if (error)
4853 		goto out;
4854 
4855 	ua = lkb->lkb_ua;
4856 	if (ua_tmp->castparam)
4857 		ua->castparam = ua_tmp->castparam;
4858 	ua->user_lksb = ua_tmp->user_lksb;
4859 
4860 	error = set_unlock_args(flags, ua, &args);
4861 	if (error)
4862 		goto out_put;
4863 
4864 	error = cancel_lock(ls, lkb, &args);
4865 
4866 	if (error == -DLM_ECANCEL)
4867 		error = 0;
4868 	/* from validate_unlock_args() */
4869 	if (error == -EBUSY)
4870 		error = 0;
4871  out_put:
4872 	dlm_put_lkb(lkb);
4873  out:
4874 	dlm_unlock_recovery(ls);
4875 	kfree(ua_tmp);
4876 	return error;
4877 }
4878 
4879 int dlm_user_deadlock(struct dlm_ls *ls, uint32_t flags, uint32_t lkid)
4880 {
4881 	struct dlm_lkb *lkb;
4882 	struct dlm_args args;
4883 	struct dlm_user_args *ua;
4884 	struct dlm_rsb *r;
4885 	int error;
4886 
4887 	dlm_lock_recovery(ls);
4888 
4889 	error = find_lkb(ls, lkid, &lkb);
4890 	if (error)
4891 		goto out;
4892 
4893 	ua = lkb->lkb_ua;
4894 
4895 	error = set_unlock_args(flags, ua, &args);
4896 	if (error)
4897 		goto out_put;
4898 
4899 	/* same as cancel_lock(), but set DEADLOCK_CANCEL after lock_rsb */
4900 
4901 	r = lkb->lkb_resource;
4902 	hold_rsb(r);
4903 	lock_rsb(r);
4904 
4905 	error = validate_unlock_args(lkb, &args);
4906 	if (error)
4907 		goto out_r;
4908 	lkb->lkb_flags |= DLM_IFL_DEADLOCK_CANCEL;
4909 
4910 	error = _cancel_lock(r, lkb);
4911  out_r:
4912 	unlock_rsb(r);
4913 	put_rsb(r);
4914 
4915 	if (error == -DLM_ECANCEL)
4916 		error = 0;
4917 	/* from validate_unlock_args() */
4918 	if (error == -EBUSY)
4919 		error = 0;
4920  out_put:
4921 	dlm_put_lkb(lkb);
4922  out:
4923 	dlm_unlock_recovery(ls);
4924 	return error;
4925 }
4926 
4927 /* lkb's that are removed from the waiters list by revert are just left on the
4928    orphans list with the granted orphan locks, to be freed by purge */
4929 
4930 static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4931 {
4932 	struct dlm_args args;
4933 	int error;
4934 
4935 	hold_lkb(lkb);
4936 	mutex_lock(&ls->ls_orphans_mutex);
4937 	list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4938 	mutex_unlock(&ls->ls_orphans_mutex);
4939 
4940 	set_unlock_args(0, lkb->lkb_ua, &args);
4941 
4942 	error = cancel_lock(ls, lkb, &args);
4943 	if (error == -DLM_ECANCEL)
4944 		error = 0;
4945 	return error;
4946 }
4947 
4948 /* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4949    Regardless of what rsb queue the lock is on, it's removed and freed. */
4950 
4951 static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4952 {
4953 	struct dlm_args args;
4954 	int error;
4955 
4956 	set_unlock_args(DLM_LKF_FORCEUNLOCK, lkb->lkb_ua, &args);
4957 
4958 	error = unlock_lock(ls, lkb, &args);
4959 	if (error == -DLM_EUNLOCK)
4960 		error = 0;
4961 	return error;
4962 }
4963 
4964 /* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4965    (which does lock_rsb) due to deadlock with receiving a message that does
4966    lock_rsb followed by dlm_user_add_cb() */
4967 
4968 static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4969 				     struct dlm_user_proc *proc)
4970 {
4971 	struct dlm_lkb *lkb = NULL;
4972 
4973 	mutex_lock(&ls->ls_clear_proc_locks);
4974 	if (list_empty(&proc->locks))
4975 		goto out;
4976 
4977 	lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4978 	list_del_init(&lkb->lkb_ownqueue);
4979 
4980 	if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4981 		lkb->lkb_flags |= DLM_IFL_ORPHAN;
4982 	else
4983 		lkb->lkb_flags |= DLM_IFL_DEAD;
4984  out:
4985 	mutex_unlock(&ls->ls_clear_proc_locks);
4986 	return lkb;
4987 }
4988 
4989 /* The ls_clear_proc_locks mutex protects against dlm_user_add_cb() which
4990    1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4991    which we clear here. */
4992 
4993 /* proc CLOSING flag is set so no more device_reads should look at proc->asts
4994    list, and no more device_writes should add lkb's to proc->locks list; so we
4995    shouldn't need to take asts_spin or locks_spin here.  this assumes that
4996    device reads/writes/closes are serialized -- FIXME: we may need to serialize
4997    them ourself. */
4998 
4999 void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5000 {
5001 	struct dlm_lkb *lkb, *safe;
5002 
5003 	dlm_lock_recovery(ls);
5004 
5005 	while (1) {
5006 		lkb = del_proc_lock(ls, proc);
5007 		if (!lkb)
5008 			break;
5009 		del_timeout(lkb);
5010 		if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
5011 			orphan_proc_lock(ls, lkb);
5012 		else
5013 			unlock_proc_lock(ls, lkb);
5014 
5015 		/* this removes the reference for the proc->locks list
5016 		   added by dlm_user_request, it may result in the lkb
5017 		   being freed */
5018 
5019 		dlm_put_lkb(lkb);
5020 	}
5021 
5022 	mutex_lock(&ls->ls_clear_proc_locks);
5023 
5024 	/* in-progress unlocks */
5025 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5026 		list_del_init(&lkb->lkb_ownqueue);
5027 		lkb->lkb_flags |= DLM_IFL_DEAD;
5028 		dlm_put_lkb(lkb);
5029 	}
5030 
5031 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5032 		memset(&lkb->lkb_callbacks, 0,
5033 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5034 		list_del_init(&lkb->lkb_cb_list);
5035 		dlm_put_lkb(lkb);
5036 	}
5037 
5038 	mutex_unlock(&ls->ls_clear_proc_locks);
5039 	dlm_unlock_recovery(ls);
5040 }
5041 
5042 static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
5043 {
5044 	struct dlm_lkb *lkb, *safe;
5045 
5046 	while (1) {
5047 		lkb = NULL;
5048 		spin_lock(&proc->locks_spin);
5049 		if (!list_empty(&proc->locks)) {
5050 			lkb = list_entry(proc->locks.next, struct dlm_lkb,
5051 					 lkb_ownqueue);
5052 			list_del_init(&lkb->lkb_ownqueue);
5053 		}
5054 		spin_unlock(&proc->locks_spin);
5055 
5056 		if (!lkb)
5057 			break;
5058 
5059 		lkb->lkb_flags |= DLM_IFL_DEAD;
5060 		unlock_proc_lock(ls, lkb);
5061 		dlm_put_lkb(lkb); /* ref from proc->locks list */
5062 	}
5063 
5064 	spin_lock(&proc->locks_spin);
5065 	list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
5066 		list_del_init(&lkb->lkb_ownqueue);
5067 		lkb->lkb_flags |= DLM_IFL_DEAD;
5068 		dlm_put_lkb(lkb);
5069 	}
5070 	spin_unlock(&proc->locks_spin);
5071 
5072 	spin_lock(&proc->asts_spin);
5073 	list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_cb_list) {
5074 		memset(&lkb->lkb_callbacks, 0,
5075 		       sizeof(struct dlm_callback) * DLM_CALLBACKS_SIZE);
5076 		list_del_init(&lkb->lkb_cb_list);
5077 		dlm_put_lkb(lkb);
5078 	}
5079 	spin_unlock(&proc->asts_spin);
5080 }
5081 
5082 /* pid of 0 means purge all orphans */
5083 
5084 static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
5085 {
5086 	struct dlm_lkb *lkb, *safe;
5087 
5088 	mutex_lock(&ls->ls_orphans_mutex);
5089 	list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
5090 		if (pid && lkb->lkb_ownpid != pid)
5091 			continue;
5092 		unlock_proc_lock(ls, lkb);
5093 		list_del_init(&lkb->lkb_ownqueue);
5094 		dlm_put_lkb(lkb);
5095 	}
5096 	mutex_unlock(&ls->ls_orphans_mutex);
5097 }
5098 
5099 static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
5100 {
5101 	struct dlm_message *ms;
5102 	struct dlm_mhandle *mh;
5103 	int error;
5104 
5105 	error = _create_message(ls, sizeof(struct dlm_message), nodeid,
5106 				DLM_MSG_PURGE, &ms, &mh);
5107 	if (error)
5108 		return error;
5109 	ms->m_nodeid = nodeid;
5110 	ms->m_pid = pid;
5111 
5112 	return send_message(mh, ms);
5113 }
5114 
5115 int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
5116 		   int nodeid, int pid)
5117 {
5118 	int error = 0;
5119 
5120 	if (nodeid != dlm_our_nodeid()) {
5121 		error = send_purge(ls, nodeid, pid);
5122 	} else {
5123 		dlm_lock_recovery(ls);
5124 		if (pid == current->pid)
5125 			purge_proc_locks(ls, proc);
5126 		else
5127 			do_purge(ls, nodeid, pid);
5128 		dlm_unlock_recovery(ls);
5129 	}
5130 	return error;
5131 }
5132 
5133