xref: /linux/fs/ocfs2/dlm/dlmmaster.c (revision f24e9f586b377749dff37554696cf3a105540c94)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42 
43 
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47 
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdomain.h"
51 
52 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
53 #include "cluster/masklog.h"
54 
55 enum dlm_mle_type {
56 	DLM_MLE_BLOCK,
57 	DLM_MLE_MASTER,
58 	DLM_MLE_MIGRATION
59 };
60 
61 struct dlm_lock_name
62 {
63 	u8 len;
64 	u8 name[DLM_LOCKID_NAME_MAX];
65 };
66 
67 struct dlm_master_list_entry
68 {
69 	struct list_head list;
70 	struct list_head hb_events;
71 	struct dlm_ctxt *dlm;
72 	spinlock_t spinlock;
73 	wait_queue_head_t wq;
74 	atomic_t woken;
75 	struct kref mle_refs;
76 	int inuse;
77 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80 	unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81 	u8 master;
82 	u8 new_master;
83 	enum dlm_mle_type type;
84 	struct o2hb_callback_func mle_hb_up;
85 	struct o2hb_callback_func mle_hb_down;
86 	union {
87 		struct dlm_lock_resource *res;
88 		struct dlm_lock_name name;
89 	} u;
90 };
91 
92 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
93 			      struct dlm_master_list_entry *mle,
94 			      struct o2nm_node *node,
95 			      int idx);
96 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
97 			    struct dlm_master_list_entry *mle,
98 			    struct o2nm_node *node,
99 			    int idx);
100 
101 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
103 				unsigned int namelen, void *nodemap,
104 				u32 flags);
105 
106 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 				struct dlm_master_list_entry *mle,
108 				const char *name,
109 				unsigned int namelen)
110 {
111 	struct dlm_lock_resource *res;
112 
113 	if (dlm != mle->dlm)
114 		return 0;
115 
116 	if (mle->type == DLM_MLE_BLOCK ||
117 	    mle->type == DLM_MLE_MIGRATION) {
118 		if (namelen != mle->u.name.len ||
119     	    	    memcmp(name, mle->u.name.name, namelen)!=0)
120 			return 0;
121 	} else {
122 		res = mle->u.res;
123 		if (namelen != res->lockname.len ||
124 		    memcmp(res->lockname.name, name, namelen) != 0)
125 			return 0;
126 	}
127 	return 1;
128 }
129 
130 #define dlm_print_nodemap(m)  _dlm_print_nodemap(m,#m)
131 static void _dlm_print_nodemap(unsigned long *map, const char *mapname)
132 {
133 	int i;
134 	printk("%s=[ ", mapname);
135 	for (i=0; i<O2NM_MAX_NODES; i++)
136 		if (test_bit(i, map))
137 			printk("%d ", i);
138 	printk("]");
139 }
140 
141 static void dlm_print_one_mle(struct dlm_master_list_entry *mle)
142 {
143 	int refs;
144 	char *type;
145 	char attached;
146 	u8 master;
147 	unsigned int namelen;
148 	const char *name;
149 	struct kref *k;
150 	unsigned long *maybe = mle->maybe_map,
151 		      *vote = mle->vote_map,
152 		      *resp = mle->response_map,
153 		      *node = mle->node_map;
154 
155 	k = &mle->mle_refs;
156 	if (mle->type == DLM_MLE_BLOCK)
157 		type = "BLK";
158 	else if (mle->type == DLM_MLE_MASTER)
159 		type = "MAS";
160 	else
161 		type = "MIG";
162 	refs = atomic_read(&k->refcount);
163 	master = mle->master;
164 	attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
165 
166 	if (mle->type != DLM_MLE_MASTER) {
167 		namelen = mle->u.name.len;
168 		name = mle->u.name.name;
169 	} else {
170 		namelen = mle->u.res->lockname.len;
171 		name = mle->u.res->lockname.name;
172 	}
173 
174 	mlog(ML_NOTICE, "%.*s: %3s refs=%3d mas=%3u new=%3u evt=%c inuse=%d ",
175 		  namelen, name, type, refs, master, mle->new_master, attached,
176 		  mle->inuse);
177 	dlm_print_nodemap(maybe);
178 	printk(", ");
179 	dlm_print_nodemap(vote);
180 	printk(", ");
181 	dlm_print_nodemap(resp);
182 	printk(", ");
183 	dlm_print_nodemap(node);
184 	printk(", ");
185 	printk("\n");
186 }
187 
188 #if 0
189 /* Code here is included but defined out as it aids debugging */
190 
191 static void dlm_dump_mles(struct dlm_ctxt *dlm)
192 {
193 	struct dlm_master_list_entry *mle;
194 	struct list_head *iter;
195 
196 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
197 	spin_lock(&dlm->master_lock);
198 	list_for_each(iter, &dlm->master_list) {
199 		mle = list_entry(iter, struct dlm_master_list_entry, list);
200 		dlm_print_one_mle(mle);
201 	}
202 	spin_unlock(&dlm->master_lock);
203 }
204 
205 int dlm_dump_all_mles(const char __user *data, unsigned int len)
206 {
207 	struct list_head *iter;
208 	struct dlm_ctxt *dlm;
209 
210 	spin_lock(&dlm_domain_lock);
211 	list_for_each(iter, &dlm_domains) {
212 		dlm = list_entry (iter, struct dlm_ctxt, list);
213 		mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
214 		dlm_dump_mles(dlm);
215 	}
216 	spin_unlock(&dlm_domain_lock);
217 	return len;
218 }
219 EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
220 
221 #endif  /*  0  */
222 
223 
224 static kmem_cache_t *dlm_mle_cache = NULL;
225 
226 
227 static void dlm_mle_release(struct kref *kref);
228 static void dlm_init_mle(struct dlm_master_list_entry *mle,
229 			enum dlm_mle_type type,
230 			struct dlm_ctxt *dlm,
231 			struct dlm_lock_resource *res,
232 			const char *name,
233 			unsigned int namelen);
234 static void dlm_put_mle(struct dlm_master_list_entry *mle);
235 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
236 static int dlm_find_mle(struct dlm_ctxt *dlm,
237 			struct dlm_master_list_entry **mle,
238 			char *name, unsigned int namelen);
239 
240 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
241 
242 
243 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
244 				     struct dlm_lock_resource *res,
245 				     struct dlm_master_list_entry *mle,
246 				     int *blocked);
247 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
248 				    struct dlm_lock_resource *res,
249 				    struct dlm_master_list_entry *mle,
250 				    int blocked);
251 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
252 				 struct dlm_lock_resource *res,
253 				 struct dlm_master_list_entry *mle,
254 				 struct dlm_master_list_entry **oldmle,
255 				 const char *name, unsigned int namelen,
256 				 u8 new_master, u8 master);
257 
258 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
259 				    struct dlm_lock_resource *res);
260 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
261 				      struct dlm_lock_resource *res);
262 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
263 				       struct dlm_lock_resource *res,
264 				       u8 target);
265 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
266 				       struct dlm_lock_resource *res);
267 
268 
269 int dlm_is_host_down(int errno)
270 {
271 	switch (errno) {
272 		case -EBADF:
273 		case -ECONNREFUSED:
274 		case -ENOTCONN:
275 		case -ECONNRESET:
276 		case -EPIPE:
277 		case -EHOSTDOWN:
278 		case -EHOSTUNREACH:
279 		case -ETIMEDOUT:
280 		case -ECONNABORTED:
281 		case -ENETDOWN:
282 		case -ENETUNREACH:
283 		case -ENETRESET:
284 		case -ESHUTDOWN:
285 		case -ENOPROTOOPT:
286 		case -EINVAL:   /* if returned from our tcp code,
287 				   this means there is no socket */
288 			return 1;
289 	}
290 	return 0;
291 }
292 
293 
294 /*
295  * MASTER LIST FUNCTIONS
296  */
297 
298 
299 /*
300  * regarding master list entries and heartbeat callbacks:
301  *
302  * in order to avoid sleeping and allocation that occurs in
303  * heartbeat, master list entries are simply attached to the
304  * dlm's established heartbeat callbacks.  the mle is attached
305  * when it is created, and since the dlm->spinlock is held at
306  * that time, any heartbeat event will be properly discovered
307  * by the mle.  the mle needs to be detached from the
308  * dlm->mle_hb_events list as soon as heartbeat events are no
309  * longer useful to the mle, and before the mle is freed.
310  *
311  * as a general rule, heartbeat events are no longer needed by
312  * the mle once an "answer" regarding the lock master has been
313  * received.
314  */
315 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
316 					      struct dlm_master_list_entry *mle)
317 {
318 	assert_spin_locked(&dlm->spinlock);
319 
320 	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
321 }
322 
323 
324 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
325 					      struct dlm_master_list_entry *mle)
326 {
327 	if (!list_empty(&mle->hb_events))
328 		list_del_init(&mle->hb_events);
329 }
330 
331 
332 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
333 					    struct dlm_master_list_entry *mle)
334 {
335 	spin_lock(&dlm->spinlock);
336 	__dlm_mle_detach_hb_events(dlm, mle);
337 	spin_unlock(&dlm->spinlock);
338 }
339 
340 static void dlm_get_mle_inuse(struct dlm_master_list_entry *mle)
341 {
342 	struct dlm_ctxt *dlm;
343 	dlm = mle->dlm;
344 
345 	assert_spin_locked(&dlm->spinlock);
346 	assert_spin_locked(&dlm->master_lock);
347 	mle->inuse++;
348 	kref_get(&mle->mle_refs);
349 }
350 
351 static void dlm_put_mle_inuse(struct dlm_master_list_entry *mle)
352 {
353 	struct dlm_ctxt *dlm;
354 	dlm = mle->dlm;
355 
356 	spin_lock(&dlm->spinlock);
357 	spin_lock(&dlm->master_lock);
358 	mle->inuse--;
359 	__dlm_put_mle(mle);
360 	spin_unlock(&dlm->master_lock);
361 	spin_unlock(&dlm->spinlock);
362 
363 }
364 
365 /* remove from list and free */
366 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
367 {
368 	struct dlm_ctxt *dlm;
369 	dlm = mle->dlm;
370 
371 	assert_spin_locked(&dlm->spinlock);
372 	assert_spin_locked(&dlm->master_lock);
373 	if (!atomic_read(&mle->mle_refs.refcount)) {
374 		/* this may or may not crash, but who cares.
375 		 * it's a BUG. */
376 		mlog(ML_ERROR, "bad mle: %p\n", mle);
377 		dlm_print_one_mle(mle);
378 		BUG();
379 	} else
380 		kref_put(&mle->mle_refs, dlm_mle_release);
381 }
382 
383 
384 /* must not have any spinlocks coming in */
385 static void dlm_put_mle(struct dlm_master_list_entry *mle)
386 {
387 	struct dlm_ctxt *dlm;
388 	dlm = mle->dlm;
389 
390 	spin_lock(&dlm->spinlock);
391 	spin_lock(&dlm->master_lock);
392 	__dlm_put_mle(mle);
393 	spin_unlock(&dlm->master_lock);
394 	spin_unlock(&dlm->spinlock);
395 }
396 
397 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
398 {
399 	kref_get(&mle->mle_refs);
400 }
401 
402 static void dlm_init_mle(struct dlm_master_list_entry *mle,
403 			enum dlm_mle_type type,
404 			struct dlm_ctxt *dlm,
405 			struct dlm_lock_resource *res,
406 			const char *name,
407 			unsigned int namelen)
408 {
409 	assert_spin_locked(&dlm->spinlock);
410 
411 	mle->dlm = dlm;
412 	mle->type = type;
413 	INIT_LIST_HEAD(&mle->list);
414 	INIT_LIST_HEAD(&mle->hb_events);
415 	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
416 	spin_lock_init(&mle->spinlock);
417 	init_waitqueue_head(&mle->wq);
418 	atomic_set(&mle->woken, 0);
419 	kref_init(&mle->mle_refs);
420 	memset(mle->response_map, 0, sizeof(mle->response_map));
421 	mle->master = O2NM_MAX_NODES;
422 	mle->new_master = O2NM_MAX_NODES;
423 	mle->inuse = 0;
424 
425 	if (mle->type == DLM_MLE_MASTER) {
426 		BUG_ON(!res);
427 		mle->u.res = res;
428 	} else if (mle->type == DLM_MLE_BLOCK) {
429 		BUG_ON(!name);
430 		memcpy(mle->u.name.name, name, namelen);
431 		mle->u.name.len = namelen;
432 	} else /* DLM_MLE_MIGRATION */ {
433 		BUG_ON(!name);
434 		memcpy(mle->u.name.name, name, namelen);
435 		mle->u.name.len = namelen;
436 	}
437 
438 	/* copy off the node_map and register hb callbacks on our copy */
439 	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
440 	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
441 	clear_bit(dlm->node_num, mle->vote_map);
442 	clear_bit(dlm->node_num, mle->node_map);
443 
444 	/* attach the mle to the domain node up/down events */
445 	__dlm_mle_attach_hb_events(dlm, mle);
446 }
447 
448 
449 /* returns 1 if found, 0 if not */
450 static int dlm_find_mle(struct dlm_ctxt *dlm,
451 			struct dlm_master_list_entry **mle,
452 			char *name, unsigned int namelen)
453 {
454 	struct dlm_master_list_entry *tmpmle;
455 	struct list_head *iter;
456 
457 	assert_spin_locked(&dlm->master_lock);
458 
459 	list_for_each(iter, &dlm->master_list) {
460 		tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
461 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
462 			continue;
463 		dlm_get_mle(tmpmle);
464 		*mle = tmpmle;
465 		return 1;
466 	}
467 	return 0;
468 }
469 
470 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
471 {
472 	struct dlm_master_list_entry *mle;
473 	struct list_head *iter;
474 
475 	assert_spin_locked(&dlm->spinlock);
476 
477 	list_for_each(iter, &dlm->mle_hb_events) {
478 		mle = list_entry(iter, struct dlm_master_list_entry,
479 				 hb_events);
480 		if (node_up)
481 			dlm_mle_node_up(dlm, mle, NULL, idx);
482 		else
483 			dlm_mle_node_down(dlm, mle, NULL, idx);
484 	}
485 }
486 
487 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
488 			      struct dlm_master_list_entry *mle,
489 			      struct o2nm_node *node, int idx)
490 {
491 	spin_lock(&mle->spinlock);
492 
493 	if (!test_bit(idx, mle->node_map))
494 		mlog(0, "node %u already removed from nodemap!\n", idx);
495 	else
496 		clear_bit(idx, mle->node_map);
497 
498 	spin_unlock(&mle->spinlock);
499 }
500 
501 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
502 			    struct dlm_master_list_entry *mle,
503 			    struct o2nm_node *node, int idx)
504 {
505 	spin_lock(&mle->spinlock);
506 
507 	if (test_bit(idx, mle->node_map))
508 		mlog(0, "node %u already in node map!\n", idx);
509 	else
510 		set_bit(idx, mle->node_map);
511 
512 	spin_unlock(&mle->spinlock);
513 }
514 
515 
516 int dlm_init_mle_cache(void)
517 {
518 	dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
519 					  sizeof(struct dlm_master_list_entry),
520 					  0, SLAB_HWCACHE_ALIGN,
521 					  NULL, NULL);
522 	if (dlm_mle_cache == NULL)
523 		return -ENOMEM;
524 	return 0;
525 }
526 
527 void dlm_destroy_mle_cache(void)
528 {
529 	if (dlm_mle_cache)
530 		kmem_cache_destroy(dlm_mle_cache);
531 }
532 
533 static void dlm_mle_release(struct kref *kref)
534 {
535 	struct dlm_master_list_entry *mle;
536 	struct dlm_ctxt *dlm;
537 
538 	mlog_entry_void();
539 
540 	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
541 	dlm = mle->dlm;
542 
543 	if (mle->type != DLM_MLE_MASTER) {
544 		mlog(0, "calling mle_release for %.*s, type %d\n",
545 		     mle->u.name.len, mle->u.name.name, mle->type);
546 	} else {
547 		mlog(0, "calling mle_release for %.*s, type %d\n",
548 		     mle->u.res->lockname.len,
549 		     mle->u.res->lockname.name, mle->type);
550 	}
551 	assert_spin_locked(&dlm->spinlock);
552 	assert_spin_locked(&dlm->master_lock);
553 
554 	/* remove from list if not already */
555 	if (!list_empty(&mle->list))
556 		list_del_init(&mle->list);
557 
558 	/* detach the mle from the domain node up/down events */
559 	__dlm_mle_detach_hb_events(dlm, mle);
560 
561 	/* NOTE: kfree under spinlock here.
562 	 * if this is bad, we can move this to a freelist. */
563 	kmem_cache_free(dlm_mle_cache, mle);
564 }
565 
566 
567 /*
568  * LOCK RESOURCE FUNCTIONS
569  */
570 
571 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
572 				  struct dlm_lock_resource *res,
573 				  u8 owner)
574 {
575 	assert_spin_locked(&res->spinlock);
576 
577 	mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
578 
579 	if (owner == dlm->node_num)
580 		atomic_inc(&dlm->local_resources);
581 	else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
582 		atomic_inc(&dlm->unknown_resources);
583 	else
584 		atomic_inc(&dlm->remote_resources);
585 
586 	res->owner = owner;
587 }
588 
589 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
590 			      struct dlm_lock_resource *res, u8 owner)
591 {
592 	assert_spin_locked(&res->spinlock);
593 
594 	if (owner == res->owner)
595 		return;
596 
597 	if (res->owner == dlm->node_num)
598 		atomic_dec(&dlm->local_resources);
599 	else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
600 		atomic_dec(&dlm->unknown_resources);
601 	else
602 		atomic_dec(&dlm->remote_resources);
603 
604 	dlm_set_lockres_owner(dlm, res, owner);
605 }
606 
607 
608 static void dlm_lockres_release(struct kref *kref)
609 {
610 	struct dlm_lock_resource *res;
611 
612 	res = container_of(kref, struct dlm_lock_resource, refs);
613 
614 	/* This should not happen -- all lockres' have a name
615 	 * associated with them at init time. */
616 	BUG_ON(!res->lockname.name);
617 
618 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
619 	     res->lockname.name);
620 
621 	if (!hlist_unhashed(&res->hash_node) ||
622 	    !list_empty(&res->granted) ||
623 	    !list_empty(&res->converting) ||
624 	    !list_empty(&res->blocked) ||
625 	    !list_empty(&res->dirty) ||
626 	    !list_empty(&res->recovering) ||
627 	    !list_empty(&res->purge)) {
628 		mlog(ML_ERROR,
629 		     "Going to BUG for resource %.*s."
630 		     "  We're on a list! [%c%c%c%c%c%c%c]\n",
631 		     res->lockname.len, res->lockname.name,
632 		     !hlist_unhashed(&res->hash_node) ? 'H' : ' ',
633 		     !list_empty(&res->granted) ? 'G' : ' ',
634 		     !list_empty(&res->converting) ? 'C' : ' ',
635 		     !list_empty(&res->blocked) ? 'B' : ' ',
636 		     !list_empty(&res->dirty) ? 'D' : ' ',
637 		     !list_empty(&res->recovering) ? 'R' : ' ',
638 		     !list_empty(&res->purge) ? 'P' : ' ');
639 
640 		dlm_print_one_lock_resource(res);
641 	}
642 
643 	/* By the time we're ready to blow this guy away, we shouldn't
644 	 * be on any lists. */
645 	BUG_ON(!hlist_unhashed(&res->hash_node));
646 	BUG_ON(!list_empty(&res->granted));
647 	BUG_ON(!list_empty(&res->converting));
648 	BUG_ON(!list_empty(&res->blocked));
649 	BUG_ON(!list_empty(&res->dirty));
650 	BUG_ON(!list_empty(&res->recovering));
651 	BUG_ON(!list_empty(&res->purge));
652 
653 	kfree(res->lockname.name);
654 
655 	kfree(res);
656 }
657 
658 void dlm_lockres_put(struct dlm_lock_resource *res)
659 {
660 	kref_put(&res->refs, dlm_lockres_release);
661 }
662 
663 static void dlm_init_lockres(struct dlm_ctxt *dlm,
664 			     struct dlm_lock_resource *res,
665 			     const char *name, unsigned int namelen)
666 {
667 	char *qname;
668 
669 	/* If we memset here, we lose our reference to the kmalloc'd
670 	 * res->lockname.name, so be sure to init every field
671 	 * correctly! */
672 
673 	qname = (char *) res->lockname.name;
674 	memcpy(qname, name, namelen);
675 
676 	res->lockname.len = namelen;
677 	res->lockname.hash = dlm_lockid_hash(name, namelen);
678 
679 	init_waitqueue_head(&res->wq);
680 	spin_lock_init(&res->spinlock);
681 	INIT_HLIST_NODE(&res->hash_node);
682 	INIT_LIST_HEAD(&res->granted);
683 	INIT_LIST_HEAD(&res->converting);
684 	INIT_LIST_HEAD(&res->blocked);
685 	INIT_LIST_HEAD(&res->dirty);
686 	INIT_LIST_HEAD(&res->recovering);
687 	INIT_LIST_HEAD(&res->purge);
688 	atomic_set(&res->asts_reserved, 0);
689 	res->migration_pending = 0;
690 
691 	kref_init(&res->refs);
692 
693 	/* just for consistency */
694 	spin_lock(&res->spinlock);
695 	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
696 	spin_unlock(&res->spinlock);
697 
698 	res->state = DLM_LOCK_RES_IN_PROGRESS;
699 
700 	res->last_used = 0;
701 
702 	memset(res->lvb, 0, DLM_LVB_LEN);
703 }
704 
705 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
706 				   const char *name,
707 				   unsigned int namelen)
708 {
709 	struct dlm_lock_resource *res;
710 
711 	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS);
712 	if (!res)
713 		return NULL;
714 
715 	res->lockname.name = kmalloc(namelen, GFP_NOFS);
716 	if (!res->lockname.name) {
717 		kfree(res);
718 		return NULL;
719 	}
720 
721 	dlm_init_lockres(dlm, res, name, namelen);
722 	return res;
723 }
724 
725 /*
726  * lookup a lock resource by name.
727  * may already exist in the hashtable.
728  * lockid is null terminated
729  *
730  * if not, allocate enough for the lockres and for
731  * the temporary structure used in doing the mastering.
732  *
733  * also, do a lookup in the dlm->master_list to see
734  * if another node has begun mastering the same lock.
735  * if so, there should be a block entry in there
736  * for this name, and we should *not* attempt to master
737  * the lock here.   need to wait around for that node
738  * to assert_master (or die).
739  *
740  */
741 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
742 					  const char *lockid,
743 					  int flags)
744 {
745 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
746 	struct dlm_master_list_entry *mle = NULL;
747 	struct dlm_master_list_entry *alloc_mle = NULL;
748 	int blocked = 0;
749 	int ret, nodenum;
750 	struct dlm_node_iter iter;
751 	unsigned int namelen, hash;
752 	int tries = 0;
753 	int bit, wait_on_recovery = 0;
754 
755 	BUG_ON(!lockid);
756 
757 	namelen = strlen(lockid);
758 	hash = dlm_lockid_hash(lockid, namelen);
759 
760 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
761 
762 lookup:
763 	spin_lock(&dlm->spinlock);
764 	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen, hash);
765 	if (tmpres) {
766 		spin_unlock(&dlm->spinlock);
767 		mlog(0, "found in hash!\n");
768 		if (res)
769 			dlm_lockres_put(res);
770 		res = tmpres;
771 		goto leave;
772 	}
773 
774 	if (!res) {
775 		spin_unlock(&dlm->spinlock);
776 		mlog(0, "allocating a new resource\n");
777 		/* nothing found and we need to allocate one. */
778 		alloc_mle = (struct dlm_master_list_entry *)
779 			kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
780 		if (!alloc_mle)
781 			goto leave;
782 		res = dlm_new_lockres(dlm, lockid, namelen);
783 		if (!res)
784 			goto leave;
785 		goto lookup;
786 	}
787 
788 	mlog(0, "no lockres found, allocated our own: %p\n", res);
789 
790 	if (flags & LKM_LOCAL) {
791 		/* caller knows it's safe to assume it's not mastered elsewhere
792 		 * DONE!  return right away */
793 		spin_lock(&res->spinlock);
794 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
795 		__dlm_insert_lockres(dlm, res);
796 		spin_unlock(&res->spinlock);
797 		spin_unlock(&dlm->spinlock);
798 		/* lockres still marked IN_PROGRESS */
799 		goto wake_waiters;
800 	}
801 
802 	/* check master list to see if another node has started mastering it */
803 	spin_lock(&dlm->master_lock);
804 
805 	/* if we found a block, wait for lock to be mastered by another node */
806 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
807 	if (blocked) {
808 		if (mle->type == DLM_MLE_MASTER) {
809 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
810 			BUG();
811 		} else if (mle->type == DLM_MLE_MIGRATION) {
812 			/* migration is in progress! */
813 			/* the good news is that we now know the
814 			 * "current" master (mle->master). */
815 
816 			spin_unlock(&dlm->master_lock);
817 			assert_spin_locked(&dlm->spinlock);
818 
819 			/* set the lockres owner and hash it */
820 			spin_lock(&res->spinlock);
821 			dlm_set_lockres_owner(dlm, res, mle->master);
822 			__dlm_insert_lockres(dlm, res);
823 			spin_unlock(&res->spinlock);
824 			spin_unlock(&dlm->spinlock);
825 
826 			/* master is known, detach */
827 			dlm_mle_detach_hb_events(dlm, mle);
828 			dlm_put_mle(mle);
829 			mle = NULL;
830 			goto wake_waiters;
831 		}
832 	} else {
833 		/* go ahead and try to master lock on this node */
834 		mle = alloc_mle;
835 		/* make sure this does not get freed below */
836 		alloc_mle = NULL;
837 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
838 		set_bit(dlm->node_num, mle->maybe_map);
839 		list_add(&mle->list, &dlm->master_list);
840 
841 		/* still holding the dlm spinlock, check the recovery map
842 		 * to see if there are any nodes that still need to be
843 		 * considered.  these will not appear in the mle nodemap
844 		 * but they might own this lockres.  wait on them. */
845 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
846 		if (bit < O2NM_MAX_NODES) {
847 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
848 			     "recover before lock mastery can begin\n",
849 			     dlm->name, namelen, (char *)lockid, bit);
850 			wait_on_recovery = 1;
851 		}
852 	}
853 
854 	/* at this point there is either a DLM_MLE_BLOCK or a
855 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
856 	 * lockres to the hashtable.  anyone who finds the lock will
857 	 * still have to wait on the IN_PROGRESS. */
858 
859 	/* finally add the lockres to its hash bucket */
860 	__dlm_insert_lockres(dlm, res);
861 	/* get an extra ref on the mle in case this is a BLOCK
862 	 * if so, the creator of the BLOCK may try to put the last
863 	 * ref at this time in the assert master handler, so we
864 	 * need an extra one to keep from a bad ptr deref. */
865 	dlm_get_mle_inuse(mle);
866 	spin_unlock(&dlm->master_lock);
867 	spin_unlock(&dlm->spinlock);
868 
869 redo_request:
870 	while (wait_on_recovery) {
871 		/* any cluster changes that occurred after dropping the
872 		 * dlm spinlock would be detectable be a change on the mle,
873 		 * so we only need to clear out the recovery map once. */
874 		if (dlm_is_recovery_lock(lockid, namelen)) {
875 			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
876 			     "must master $RECOVERY lock now\n", dlm->name);
877 			if (!dlm_pre_master_reco_lockres(dlm, res))
878 				wait_on_recovery = 0;
879 			else {
880 				mlog(0, "%s: waiting 500ms for heartbeat state "
881 				    "change\n", dlm->name);
882 				msleep(500);
883 			}
884 			continue;
885 		}
886 
887 		dlm_kick_recovery_thread(dlm);
888 		msleep(1000);
889 		dlm_wait_for_recovery(dlm);
890 
891 		spin_lock(&dlm->spinlock);
892 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
893 		if (bit < O2NM_MAX_NODES) {
894 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
895 			     "recover before lock mastery can begin\n",
896 			     dlm->name, namelen, (char *)lockid, bit);
897 			wait_on_recovery = 1;
898 		} else
899 			wait_on_recovery = 0;
900 		spin_unlock(&dlm->spinlock);
901 
902 		if (wait_on_recovery)
903 			dlm_wait_for_node_recovery(dlm, bit, 10000);
904 	}
905 
906 	/* must wait for lock to be mastered elsewhere */
907 	if (blocked)
908 		goto wait;
909 
910 	ret = -EINVAL;
911 	dlm_node_iter_init(mle->vote_map, &iter);
912 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
913 		ret = dlm_do_master_request(mle, nodenum);
914 		if (ret < 0)
915 			mlog_errno(ret);
916 		if (mle->master != O2NM_MAX_NODES) {
917 			/* found a master ! */
918 			if (mle->master <= nodenum)
919 				break;
920 			/* if our master request has not reached the master
921 			 * yet, keep going until it does.  this is how the
922 			 * master will know that asserts are needed back to
923 			 * the lower nodes. */
924 			mlog(0, "%s:%.*s: requests only up to %u but master "
925 			     "is %u, keep going\n", dlm->name, namelen,
926 			     lockid, nodenum, mle->master);
927 		}
928 	}
929 
930 wait:
931 	/* keep going until the response map includes all nodes */
932 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
933 	if (ret < 0) {
934 		wait_on_recovery = 1;
935 		mlog(0, "%s:%.*s: node map changed, redo the "
936 		     "master request now, blocked=%d\n",
937 		     dlm->name, res->lockname.len,
938 		     res->lockname.name, blocked);
939 		if (++tries > 20) {
940 			mlog(ML_ERROR, "%s:%.*s: spinning on "
941 			     "dlm_wait_for_lock_mastery, blocked=%d\n",
942 			     dlm->name, res->lockname.len,
943 			     res->lockname.name, blocked);
944 			dlm_print_one_lock_resource(res);
945 			dlm_print_one_mle(mle);
946 			tries = 0;
947 		}
948 		goto redo_request;
949 	}
950 
951 	mlog(0, "lockres mastered by %u\n", res->owner);
952 	/* make sure we never continue without this */
953 	BUG_ON(res->owner == O2NM_MAX_NODES);
954 
955 	/* master is known, detach if not already detached */
956 	dlm_mle_detach_hb_events(dlm, mle);
957 	dlm_put_mle(mle);
958 	/* put the extra ref */
959 	dlm_put_mle_inuse(mle);
960 
961 wake_waiters:
962 	spin_lock(&res->spinlock);
963 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
964 	spin_unlock(&res->spinlock);
965 	wake_up(&res->wq);
966 
967 leave:
968 	/* need to free the unused mle */
969 	if (alloc_mle)
970 		kmem_cache_free(dlm_mle_cache, alloc_mle);
971 
972 	return res;
973 }
974 
975 
976 #define DLM_MASTERY_TIMEOUT_MS   5000
977 
978 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
979 				     struct dlm_lock_resource *res,
980 				     struct dlm_master_list_entry *mle,
981 				     int *blocked)
982 {
983 	u8 m;
984 	int ret, bit;
985 	int map_changed, voting_done;
986 	int assert, sleep;
987 
988 recheck:
989 	ret = 0;
990 	assert = 0;
991 
992 	/* check if another node has already become the owner */
993 	spin_lock(&res->spinlock);
994 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
995 		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
996 		     res->lockname.len, res->lockname.name, res->owner);
997 		spin_unlock(&res->spinlock);
998 		/* this will cause the master to re-assert across
999 		 * the whole cluster, freeing up mles */
1000 		if (res->owner != dlm->node_num) {
1001 			ret = dlm_do_master_request(mle, res->owner);
1002 			if (ret < 0) {
1003 				/* give recovery a chance to run */
1004 				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
1005 				msleep(500);
1006 				goto recheck;
1007 			}
1008 		}
1009 		ret = 0;
1010 		goto leave;
1011 	}
1012 	spin_unlock(&res->spinlock);
1013 
1014 	spin_lock(&mle->spinlock);
1015 	m = mle->master;
1016 	map_changed = (memcmp(mle->vote_map, mle->node_map,
1017 			      sizeof(mle->vote_map)) != 0);
1018 	voting_done = (memcmp(mle->vote_map, mle->response_map,
1019 			     sizeof(mle->vote_map)) == 0);
1020 
1021 	/* restart if we hit any errors */
1022 	if (map_changed) {
1023 		int b;
1024 		mlog(0, "%s: %.*s: node map changed, restarting\n",
1025 		     dlm->name, res->lockname.len, res->lockname.name);
1026 		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
1027 		b = (mle->type == DLM_MLE_BLOCK);
1028 		if ((*blocked && !b) || (!*blocked && b)) {
1029 			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
1030 			     dlm->name, res->lockname.len, res->lockname.name,
1031 			     *blocked, b);
1032 			*blocked = b;
1033 		}
1034 		spin_unlock(&mle->spinlock);
1035 		if (ret < 0) {
1036 			mlog_errno(ret);
1037 			goto leave;
1038 		}
1039 		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
1040 		     "rechecking now\n", dlm->name, res->lockname.len,
1041 		     res->lockname.name);
1042 		goto recheck;
1043 	} else {
1044 		if (!voting_done) {
1045 			mlog(0, "map not changed and voting not done "
1046 			     "for %s:%.*s\n", dlm->name, res->lockname.len,
1047 			     res->lockname.name);
1048 		}
1049 	}
1050 
1051 	if (m != O2NM_MAX_NODES) {
1052 		/* another node has done an assert!
1053 		 * all done! */
1054 		sleep = 0;
1055 	} else {
1056 		sleep = 1;
1057 		/* have all nodes responded? */
1058 		if (voting_done && !*blocked) {
1059 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
1060 			if (dlm->node_num <= bit) {
1061 				/* my node number is lowest.
1062 			 	 * now tell other nodes that I am
1063 				 * mastering this. */
1064 				mle->master = dlm->node_num;
1065 				assert = 1;
1066 				sleep = 0;
1067 			}
1068 			/* if voting is done, but we have not received
1069 			 * an assert master yet, we must sleep */
1070 		}
1071 	}
1072 
1073 	spin_unlock(&mle->spinlock);
1074 
1075 	/* sleep if we haven't finished voting yet */
1076 	if (sleep) {
1077 		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
1078 
1079 		/*
1080 		if (atomic_read(&mle->mle_refs.refcount) < 2)
1081 			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
1082 			atomic_read(&mle->mle_refs.refcount),
1083 			res->lockname.len, res->lockname.name);
1084 		*/
1085 		atomic_set(&mle->woken, 0);
1086 		(void)wait_event_timeout(mle->wq,
1087 					 (atomic_read(&mle->woken) == 1),
1088 					 timeo);
1089 		if (res->owner == O2NM_MAX_NODES) {
1090 			mlog(0, "waiting again\n");
1091 			goto recheck;
1092 		}
1093 		mlog(0, "done waiting, master is %u\n", res->owner);
1094 		ret = 0;
1095 		goto leave;
1096 	}
1097 
1098 	ret = 0;   /* done */
1099 	if (assert) {
1100 		m = dlm->node_num;
1101 		mlog(0, "about to master %.*s here, this=%u\n",
1102 		     res->lockname.len, res->lockname.name, m);
1103 		ret = dlm_do_assert_master(dlm, res->lockname.name,
1104 					   res->lockname.len, mle->vote_map, 0);
1105 		if (ret) {
1106 			/* This is a failure in the network path,
1107 			 * not in the response to the assert_master
1108 			 * (any nonzero response is a BUG on this node).
1109 			 * Most likely a socket just got disconnected
1110 			 * due to node death. */
1111 			mlog_errno(ret);
1112 		}
1113 		/* no longer need to restart lock mastery.
1114 		 * all living nodes have been contacted. */
1115 		ret = 0;
1116 	}
1117 
1118 	/* set the lockres owner */
1119 	spin_lock(&res->spinlock);
1120 	dlm_change_lockres_owner(dlm, res, m);
1121 	spin_unlock(&res->spinlock);
1122 
1123 leave:
1124 	return ret;
1125 }
1126 
1127 struct dlm_bitmap_diff_iter
1128 {
1129 	int curnode;
1130 	unsigned long *orig_bm;
1131 	unsigned long *cur_bm;
1132 	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1133 };
1134 
1135 enum dlm_node_state_change
1136 {
1137 	NODE_DOWN = -1,
1138 	NODE_NO_CHANGE = 0,
1139 	NODE_UP
1140 };
1141 
1142 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1143 				      unsigned long *orig_bm,
1144 				      unsigned long *cur_bm)
1145 {
1146 	unsigned long p1, p2;
1147 	int i;
1148 
1149 	iter->curnode = -1;
1150 	iter->orig_bm = orig_bm;
1151 	iter->cur_bm = cur_bm;
1152 
1153 	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1154        		p1 = *(iter->orig_bm + i);
1155 	       	p2 = *(iter->cur_bm + i);
1156 		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1157 	}
1158 }
1159 
1160 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1161 				     enum dlm_node_state_change *state)
1162 {
1163 	int bit;
1164 
1165 	if (iter->curnode >= O2NM_MAX_NODES)
1166 		return -ENOENT;
1167 
1168 	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1169 			    iter->curnode+1);
1170 	if (bit >= O2NM_MAX_NODES) {
1171 		iter->curnode = O2NM_MAX_NODES;
1172 		return -ENOENT;
1173 	}
1174 
1175 	/* if it was there in the original then this node died */
1176 	if (test_bit(bit, iter->orig_bm))
1177 		*state = NODE_DOWN;
1178 	else
1179 		*state = NODE_UP;
1180 
1181 	iter->curnode = bit;
1182 	return bit;
1183 }
1184 
1185 
1186 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1187 				    struct dlm_lock_resource *res,
1188 				    struct dlm_master_list_entry *mle,
1189 				    int blocked)
1190 {
1191 	struct dlm_bitmap_diff_iter bdi;
1192 	enum dlm_node_state_change sc;
1193 	int node;
1194 	int ret = 0;
1195 
1196 	mlog(0, "something happened such that the "
1197 	     "master process may need to be restarted!\n");
1198 
1199 	assert_spin_locked(&mle->spinlock);
1200 
1201 	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1202 	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1203 	while (node >= 0) {
1204 		if (sc == NODE_UP) {
1205 			/* a node came up.  clear any old vote from
1206 			 * the response map and set it in the vote map
1207 			 * then restart the mastery. */
1208 			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1209 
1210 			/* redo the master request, but only for the new node */
1211 			mlog(0, "sending request to new node\n");
1212 			clear_bit(node, mle->response_map);
1213 			set_bit(node, mle->vote_map);
1214 		} else {
1215 			mlog(ML_ERROR, "node down! %d\n", node);
1216 			if (blocked) {
1217 				int lowest = find_next_bit(mle->maybe_map,
1218 						       O2NM_MAX_NODES, 0);
1219 
1220 				/* act like it was never there */
1221 				clear_bit(node, mle->maybe_map);
1222 
1223 			       	if (node == lowest) {
1224 					mlog(0, "expected master %u died"
1225 					    " while this node was blocked "
1226 					    "waiting on it!\n", node);
1227 					lowest = find_next_bit(mle->maybe_map,
1228 						       	O2NM_MAX_NODES,
1229 						       	lowest+1);
1230 					if (lowest < O2NM_MAX_NODES) {
1231 						mlog(0, "%s:%.*s:still "
1232 						     "blocked. waiting on %u "
1233 						     "now\n", dlm->name,
1234 						     res->lockname.len,
1235 						     res->lockname.name,
1236 						     lowest);
1237 					} else {
1238 						/* mle is an MLE_BLOCK, but
1239 						 * there is now nothing left to
1240 						 * block on.  we need to return
1241 						 * all the way back out and try
1242 						 * again with an MLE_MASTER.
1243 						 * dlm_do_local_recovery_cleanup
1244 						 * has already run, so the mle
1245 						 * refcount is ok */
1246 						mlog(0, "%s:%.*s: no "
1247 						     "longer blocking. try to "
1248 						     "master this here\n",
1249 						     dlm->name,
1250 						     res->lockname.len,
1251 						     res->lockname.name);
1252 						mle->type = DLM_MLE_MASTER;
1253 						mle->u.res = res;
1254 					}
1255 				}
1256 			}
1257 
1258 			/* now blank out everything, as if we had never
1259 			 * contacted anyone */
1260 			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
1261 			memset(mle->response_map, 0, sizeof(mle->response_map));
1262 			/* reset the vote_map to the current node_map */
1263 			memcpy(mle->vote_map, mle->node_map,
1264 			       sizeof(mle->node_map));
1265 			/* put myself into the maybe map */
1266 			if (mle->type != DLM_MLE_BLOCK)
1267 				set_bit(dlm->node_num, mle->maybe_map);
1268 		}
1269 		ret = -EAGAIN;
1270 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1271 	}
1272 	return ret;
1273 }
1274 
1275 
1276 /*
1277  * DLM_MASTER_REQUEST_MSG
1278  *
1279  * returns: 0 on success,
1280  *          -errno on a network error
1281  *
1282  * on error, the caller should assume the target node is "dead"
1283  *
1284  */
1285 
1286 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1287 {
1288 	struct dlm_ctxt *dlm = mle->dlm;
1289 	struct dlm_master_request request;
1290 	int ret, response=0, resend;
1291 
1292 	memset(&request, 0, sizeof(request));
1293 	request.node_idx = dlm->node_num;
1294 
1295 	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1296 
1297 	if (mle->type != DLM_MLE_MASTER) {
1298 		request.namelen = mle->u.name.len;
1299 		memcpy(request.name, mle->u.name.name, request.namelen);
1300 	} else {
1301 		request.namelen = mle->u.res->lockname.len;
1302 		memcpy(request.name, mle->u.res->lockname.name,
1303 			request.namelen);
1304 	}
1305 
1306 again:
1307 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1308 				 sizeof(request), to, &response);
1309 	if (ret < 0)  {
1310 		if (ret == -ESRCH) {
1311 			/* should never happen */
1312 			mlog(ML_ERROR, "TCP stack not ready!\n");
1313 			BUG();
1314 		} else if (ret == -EINVAL) {
1315 			mlog(ML_ERROR, "bad args passed to o2net!\n");
1316 			BUG();
1317 		} else if (ret == -ENOMEM) {
1318 			mlog(ML_ERROR, "out of memory while trying to send "
1319 			     "network message!  retrying\n");
1320 			/* this is totally crude */
1321 			msleep(50);
1322 			goto again;
1323 		} else if (!dlm_is_host_down(ret)) {
1324 			/* not a network error. bad. */
1325 			mlog_errno(ret);
1326 			mlog(ML_ERROR, "unhandled error!");
1327 			BUG();
1328 		}
1329 		/* all other errors should be network errors,
1330 		 * and likely indicate node death */
1331 		mlog(ML_ERROR, "link to %d went down!\n", to);
1332 		goto out;
1333 	}
1334 
1335 	ret = 0;
1336 	resend = 0;
1337 	spin_lock(&mle->spinlock);
1338 	switch (response) {
1339 		case DLM_MASTER_RESP_YES:
1340 			set_bit(to, mle->response_map);
1341 			mlog(0, "node %u is the master, response=YES\n", to);
1342 			mle->master = to;
1343 			break;
1344 		case DLM_MASTER_RESP_NO:
1345 			mlog(0, "node %u not master, response=NO\n", to);
1346 			set_bit(to, mle->response_map);
1347 			break;
1348 		case DLM_MASTER_RESP_MAYBE:
1349 			mlog(0, "node %u not master, response=MAYBE\n", to);
1350 			set_bit(to, mle->response_map);
1351 			set_bit(to, mle->maybe_map);
1352 			break;
1353 		case DLM_MASTER_RESP_ERROR:
1354 			mlog(0, "node %u hit an error, resending\n", to);
1355 			resend = 1;
1356 			response = 0;
1357 			break;
1358 		default:
1359 			mlog(ML_ERROR, "bad response! %u\n", response);
1360 			BUG();
1361 	}
1362 	spin_unlock(&mle->spinlock);
1363 	if (resend) {
1364 		/* this is also totally crude */
1365 		msleep(50);
1366 		goto again;
1367 	}
1368 
1369 out:
1370 	return ret;
1371 }
1372 
1373 /*
1374  * locks that can be taken here:
1375  * dlm->spinlock
1376  * res->spinlock
1377  * mle->spinlock
1378  * dlm->master_list
1379  *
1380  * if possible, TRIM THIS DOWN!!!
1381  */
1382 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1383 {
1384 	u8 response = DLM_MASTER_RESP_MAYBE;
1385 	struct dlm_ctxt *dlm = data;
1386 	struct dlm_lock_resource *res = NULL;
1387 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1388 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1389 	char *name;
1390 	unsigned int namelen, hash;
1391 	int found, ret;
1392 	int set_maybe;
1393 	int dispatch_assert = 0;
1394 
1395 	if (!dlm_grab(dlm))
1396 		return DLM_MASTER_RESP_NO;
1397 
1398 	if (!dlm_domain_fully_joined(dlm)) {
1399 		response = DLM_MASTER_RESP_NO;
1400 		goto send_response;
1401 	}
1402 
1403 	name = request->name;
1404 	namelen = request->namelen;
1405 	hash = dlm_lockid_hash(name, namelen);
1406 
1407 	if (namelen > DLM_LOCKID_NAME_MAX) {
1408 		response = DLM_IVBUFLEN;
1409 		goto send_response;
1410 	}
1411 
1412 way_up_top:
1413 	spin_lock(&dlm->spinlock);
1414 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1415 	if (res) {
1416 		spin_unlock(&dlm->spinlock);
1417 
1418 		/* take care of the easy cases up front */
1419 		spin_lock(&res->spinlock);
1420 		if (res->state & DLM_LOCK_RES_RECOVERING) {
1421 			spin_unlock(&res->spinlock);
1422 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1423 			     "being recovered\n");
1424 			response = DLM_MASTER_RESP_ERROR;
1425 			if (mle)
1426 				kmem_cache_free(dlm_mle_cache, mle);
1427 			goto send_response;
1428 		}
1429 
1430 		if (res->owner == dlm->node_num) {
1431 			spin_unlock(&res->spinlock);
1432 			// mlog(0, "this node is the master\n");
1433 			response = DLM_MASTER_RESP_YES;
1434 			if (mle)
1435 				kmem_cache_free(dlm_mle_cache, mle);
1436 
1437 			/* this node is the owner.
1438 			 * there is some extra work that needs to
1439 			 * happen now.  the requesting node has
1440 			 * caused all nodes up to this one to
1441 			 * create mles.  this node now needs to
1442 			 * go back and clean those up. */
1443 			dispatch_assert = 1;
1444 			goto send_response;
1445 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1446 			spin_unlock(&res->spinlock);
1447 			// mlog(0, "node %u is the master\n", res->owner);
1448 			response = DLM_MASTER_RESP_NO;
1449 			if (mle)
1450 				kmem_cache_free(dlm_mle_cache, mle);
1451 			goto send_response;
1452 		}
1453 
1454 		/* ok, there is no owner.  either this node is
1455 		 * being blocked, or it is actively trying to
1456 		 * master this lock. */
1457 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1458 			mlog(ML_ERROR, "lock with no owner should be "
1459 			     "in-progress!\n");
1460 			BUG();
1461 		}
1462 
1463 		// mlog(0, "lockres is in progress...\n");
1464 		spin_lock(&dlm->master_lock);
1465 		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1466 		if (!found) {
1467 			mlog(ML_ERROR, "no mle found for this lock!\n");
1468 			BUG();
1469 		}
1470 		set_maybe = 1;
1471 		spin_lock(&tmpmle->spinlock);
1472 		if (tmpmle->type == DLM_MLE_BLOCK) {
1473 			// mlog(0, "this node is waiting for "
1474 			// "lockres to be mastered\n");
1475 			response = DLM_MASTER_RESP_NO;
1476 		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1477 			mlog(0, "node %u is master, but trying to migrate to "
1478 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1479 			if (tmpmle->master == dlm->node_num) {
1480 				response = DLM_MASTER_RESP_YES;
1481 				mlog(ML_ERROR, "no owner on lockres, but this "
1482 				     "node is trying to migrate it to %u?!\n",
1483 				     tmpmle->new_master);
1484 				BUG();
1485 			} else {
1486 				/* the real master can respond on its own */
1487 				response = DLM_MASTER_RESP_NO;
1488 			}
1489 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1490 			set_maybe = 0;
1491 			if (tmpmle->master == dlm->node_num) {
1492 				response = DLM_MASTER_RESP_YES;
1493 				/* this node will be the owner.
1494 				 * go back and clean the mles on any
1495 				 * other nodes */
1496 				dispatch_assert = 1;
1497 			} else
1498 				response = DLM_MASTER_RESP_NO;
1499 		} else {
1500 			// mlog(0, "this node is attempting to "
1501 			// "master lockres\n");
1502 			response = DLM_MASTER_RESP_MAYBE;
1503 		}
1504 		if (set_maybe)
1505 			set_bit(request->node_idx, tmpmle->maybe_map);
1506 		spin_unlock(&tmpmle->spinlock);
1507 
1508 		spin_unlock(&dlm->master_lock);
1509 		spin_unlock(&res->spinlock);
1510 
1511 		/* keep the mle attached to heartbeat events */
1512 		dlm_put_mle(tmpmle);
1513 		if (mle)
1514 			kmem_cache_free(dlm_mle_cache, mle);
1515 		goto send_response;
1516 	}
1517 
1518 	/*
1519 	 * lockres doesn't exist on this node
1520 	 * if there is an MLE_BLOCK, return NO
1521 	 * if there is an MLE_MASTER, return MAYBE
1522 	 * otherwise, add an MLE_BLOCK, return NO
1523 	 */
1524 	spin_lock(&dlm->master_lock);
1525 	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1526 	if (!found) {
1527 		/* this lockid has never been seen on this node yet */
1528 		// mlog(0, "no mle found\n");
1529 		if (!mle) {
1530 			spin_unlock(&dlm->master_lock);
1531 			spin_unlock(&dlm->spinlock);
1532 
1533 			mle = (struct dlm_master_list_entry *)
1534 				kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);
1535 			if (!mle) {
1536 				response = DLM_MASTER_RESP_ERROR;
1537 				mlog_errno(-ENOMEM);
1538 				goto send_response;
1539 			}
1540 			goto way_up_top;
1541 		}
1542 
1543 		// mlog(0, "this is second time thru, already allocated, "
1544 		// "add the block.\n");
1545 		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);
1546 		set_bit(request->node_idx, mle->maybe_map);
1547 		list_add(&mle->list, &dlm->master_list);
1548 		response = DLM_MASTER_RESP_NO;
1549 	} else {
1550 		// mlog(0, "mle was found\n");
1551 		set_maybe = 1;
1552 		spin_lock(&tmpmle->spinlock);
1553 		if (tmpmle->master == dlm->node_num) {
1554 			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1555 			BUG();
1556 		}
1557 		if (tmpmle->type == DLM_MLE_BLOCK)
1558 			response = DLM_MASTER_RESP_NO;
1559 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1560 			mlog(0, "migration mle was found (%u->%u)\n",
1561 			     tmpmle->master, tmpmle->new_master);
1562 			/* real master can respond on its own */
1563 			response = DLM_MASTER_RESP_NO;
1564 		} else
1565 			response = DLM_MASTER_RESP_MAYBE;
1566 		if (set_maybe)
1567 			set_bit(request->node_idx, tmpmle->maybe_map);
1568 		spin_unlock(&tmpmle->spinlock);
1569 	}
1570 	spin_unlock(&dlm->master_lock);
1571 	spin_unlock(&dlm->spinlock);
1572 
1573 	if (found) {
1574 		/* keep the mle attached to heartbeat events */
1575 		dlm_put_mle(tmpmle);
1576 	}
1577 send_response:
1578 
1579 	if (dispatch_assert) {
1580 		if (response != DLM_MASTER_RESP_YES)
1581 			mlog(ML_ERROR, "invalid response %d\n", response);
1582 		if (!res) {
1583 			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1584 			BUG();
1585 		}
1586 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1587 			     dlm->node_num, res->lockname.len, res->lockname.name);
1588 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1589 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1590 		if (ret < 0) {
1591 			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1592 			response = DLM_MASTER_RESP_ERROR;
1593 		}
1594 	}
1595 
1596 	dlm_put(dlm);
1597 	return response;
1598 }
1599 
1600 /*
1601  * DLM_ASSERT_MASTER_MSG
1602  */
1603 
1604 
1605 /*
1606  * NOTE: this can be used for debugging
1607  * can periodically run all locks owned by this node
1608  * and re-assert across the cluster...
1609  */
1610 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1611 				unsigned int namelen, void *nodemap,
1612 				u32 flags)
1613 {
1614 	struct dlm_assert_master assert;
1615 	int to, tmpret;
1616 	struct dlm_node_iter iter;
1617 	int ret = 0;
1618 	int reassert;
1619 
1620 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1621 again:
1622 	reassert = 0;
1623 
1624 	/* note that if this nodemap is empty, it returns 0 */
1625 	dlm_node_iter_init(nodemap, &iter);
1626 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1627 		int r = 0;
1628 		struct dlm_master_list_entry *mle = NULL;
1629 
1630 		mlog(0, "sending assert master to %d (%.*s)\n", to,
1631 		     namelen, lockname);
1632 		memset(&assert, 0, sizeof(assert));
1633 		assert.node_idx = dlm->node_num;
1634 		assert.namelen = namelen;
1635 		memcpy(assert.name, lockname, namelen);
1636 		assert.flags = cpu_to_be32(flags);
1637 
1638 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1639 					    &assert, sizeof(assert), to, &r);
1640 		if (tmpret < 0) {
1641 			mlog(0, "assert_master returned %d!\n", tmpret);
1642 			if (!dlm_is_host_down(tmpret)) {
1643 				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);
1644 				BUG();
1645 			}
1646 			/* a node died.  finish out the rest of the nodes. */
1647 			mlog(0, "link to %d went down!\n", to);
1648 			/* any nonzero status return will do */
1649 			ret = tmpret;
1650 		} else if (r < 0) {
1651 			/* ok, something horribly messed.  kill thyself. */
1652 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1653 			     "got %d.\n", namelen, lockname, to, r);
1654 			spin_lock(&dlm->spinlock);
1655 			spin_lock(&dlm->master_lock);
1656 			if (dlm_find_mle(dlm, &mle, (char *)lockname,
1657 					 namelen)) {
1658 				dlm_print_one_mle(mle);
1659 				__dlm_put_mle(mle);
1660 			}
1661 			spin_unlock(&dlm->master_lock);
1662 			spin_unlock(&dlm->spinlock);
1663 			BUG();
1664 		} else if (r == EAGAIN) {
1665 			mlog(0, "%.*s: node %u create mles on other "
1666 			     "nodes and requests a re-assert\n",
1667 			     namelen, lockname, to);
1668 			reassert = 1;
1669 		}
1670 	}
1671 
1672 	if (reassert)
1673 		goto again;
1674 
1675 	return ret;
1676 }
1677 
1678 /*
1679  * locks that can be taken here:
1680  * dlm->spinlock
1681  * res->spinlock
1682  * mle->spinlock
1683  * dlm->master_list
1684  *
1685  * if possible, TRIM THIS DOWN!!!
1686  */
1687 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1688 {
1689 	struct dlm_ctxt *dlm = data;
1690 	struct dlm_master_list_entry *mle = NULL;
1691 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1692 	struct dlm_lock_resource *res = NULL;
1693 	char *name;
1694 	unsigned int namelen, hash;
1695 	u32 flags;
1696 	int master_request = 0;
1697 	int ret = 0;
1698 
1699 	if (!dlm_grab(dlm))
1700 		return 0;
1701 
1702 	name = assert->name;
1703 	namelen = assert->namelen;
1704 	hash = dlm_lockid_hash(name, namelen);
1705 	flags = be32_to_cpu(assert->flags);
1706 
1707 	if (namelen > DLM_LOCKID_NAME_MAX) {
1708 		mlog(ML_ERROR, "Invalid name length!");
1709 		goto done;
1710 	}
1711 
1712 	spin_lock(&dlm->spinlock);
1713 
1714 	if (flags)
1715 		mlog(0, "assert_master with flags: %u\n", flags);
1716 
1717 	/* find the MLE */
1718 	spin_lock(&dlm->master_lock);
1719 	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1720 		/* not an error, could be master just re-asserting */
1721 		mlog(0, "just got an assert_master from %u, but no "
1722 		     "MLE for it! (%.*s)\n", assert->node_idx,
1723 		     namelen, name);
1724 	} else {
1725 		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1726 		if (bit >= O2NM_MAX_NODES) {
1727 			/* not necessarily an error, though less likely.
1728 			 * could be master just re-asserting. */
1729 			mlog(0, "no bits set in the maybe_map, but %u "
1730 			     "is asserting! (%.*s)\n", assert->node_idx,
1731 			     namelen, name);
1732 		} else if (bit != assert->node_idx) {
1733 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1734 				mlog(0, "master %u was found, %u should "
1735 				     "back off\n", assert->node_idx, bit);
1736 			} else {
1737 				/* with the fix for bug 569, a higher node
1738 				 * number winning the mastery will respond
1739 				 * YES to mastery requests, but this node
1740 				 * had no way of knowing.  let it pass. */
1741 				mlog(0, "%u is the lowest node, "
1742 				     "%u is asserting. (%.*s)  %u must "
1743 				     "have begun after %u won.\n", bit,
1744 				     assert->node_idx, namelen, name, bit,
1745 				     assert->node_idx);
1746 			}
1747 		}
1748 		if (mle->type == DLM_MLE_MIGRATION) {
1749 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1750 				mlog(0, "%s:%.*s: got cleanup assert"
1751 				     " from %u for migration\n",
1752 				     dlm->name, namelen, name,
1753 				     assert->node_idx);
1754 			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {
1755 				mlog(0, "%s:%.*s: got unrelated assert"
1756 				     " from %u for migration, ignoring\n",
1757 				     dlm->name, namelen, name,
1758 				     assert->node_idx);
1759 				__dlm_put_mle(mle);
1760 				spin_unlock(&dlm->master_lock);
1761 				spin_unlock(&dlm->spinlock);
1762 				goto done;
1763 			}
1764 		}
1765 	}
1766 	spin_unlock(&dlm->master_lock);
1767 
1768 	/* ok everything checks out with the MLE
1769 	 * now check to see if there is a lockres */
1770 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
1771 	if (res) {
1772 		spin_lock(&res->spinlock);
1773 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1774 			mlog(ML_ERROR, "%u asserting but %.*s is "
1775 			     "RECOVERING!\n", assert->node_idx, namelen, name);
1776 			goto kill;
1777 		}
1778 		if (!mle) {
1779 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&
1780 			    res->owner != assert->node_idx) {
1781 				mlog(ML_ERROR, "assert_master from "
1782 					  "%u, but current owner is "
1783 					  "%u! (%.*s)\n",
1784 				       assert->node_idx, res->owner,
1785 				       namelen, name);
1786 				goto kill;
1787 			}
1788 		} else if (mle->type != DLM_MLE_MIGRATION) {
1789 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1790 				/* owner is just re-asserting */
1791 				if (res->owner == assert->node_idx) {
1792 					mlog(0, "owner %u re-asserting on "
1793 					     "lock %.*s\n", assert->node_idx,
1794 					     namelen, name);
1795 					goto ok;
1796 				}
1797 				mlog(ML_ERROR, "got assert_master from "
1798 				     "node %u, but %u is the owner! "
1799 				     "(%.*s)\n", assert->node_idx,
1800 				     res->owner, namelen, name);
1801 				goto kill;
1802 			}
1803 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1804 				mlog(ML_ERROR, "got assert from %u, but lock "
1805 				     "with no owner should be "
1806 				     "in-progress! (%.*s)\n",
1807 				     assert->node_idx,
1808 				     namelen, name);
1809 				goto kill;
1810 			}
1811 		} else /* mle->type == DLM_MLE_MIGRATION */ {
1812 			/* should only be getting an assert from new master */
1813 			if (assert->node_idx != mle->new_master) {
1814 				mlog(ML_ERROR, "got assert from %u, but "
1815 				     "new master is %u, and old master "
1816 				     "was %u (%.*s)\n",
1817 				     assert->node_idx, mle->new_master,
1818 				     mle->master, namelen, name);
1819 				goto kill;
1820 			}
1821 
1822 		}
1823 ok:
1824 		spin_unlock(&res->spinlock);
1825 	}
1826 	spin_unlock(&dlm->spinlock);
1827 
1828 	// mlog(0, "woo!  got an assert_master from node %u!\n",
1829 	// 	     assert->node_idx);
1830 	if (mle) {
1831 		int extra_ref = 0;
1832 		int nn = -1;
1833 		int rr, err = 0;
1834 
1835 		spin_lock(&mle->spinlock);
1836 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1837 			extra_ref = 1;
1838 		else {
1839 			/* MASTER mle: if any bits set in the response map
1840 			 * then the calling node needs to re-assert to clear
1841 			 * up nodes that this node contacted */
1842 			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1843 						    nn+1)) < O2NM_MAX_NODES) {
1844 				if (nn != dlm->node_num && nn != assert->node_idx)
1845 					master_request = 1;
1846 			}
1847 		}
1848 		mle->master = assert->node_idx;
1849 		atomic_set(&mle->woken, 1);
1850 		wake_up(&mle->wq);
1851 		spin_unlock(&mle->spinlock);
1852 
1853 		if (res) {
1854 			spin_lock(&res->spinlock);
1855 			if (mle->type == DLM_MLE_MIGRATION) {
1856 				mlog(0, "finishing off migration of lockres %.*s, "
1857 			     		"from %u to %u\n",
1858 			       		res->lockname.len, res->lockname.name,
1859 			       		dlm->node_num, mle->new_master);
1860 				res->state &= ~DLM_LOCK_RES_MIGRATING;
1861 				dlm_change_lockres_owner(dlm, res, mle->new_master);
1862 				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1863 			} else {
1864 				dlm_change_lockres_owner(dlm, res, mle->master);
1865 			}
1866 			spin_unlock(&res->spinlock);
1867 		}
1868 
1869 		/* master is known, detach if not already detached.
1870 		 * ensures that only one assert_master call will happen
1871 		 * on this mle. */
1872 		spin_lock(&dlm->spinlock);
1873 		spin_lock(&dlm->master_lock);
1874 
1875 		rr = atomic_read(&mle->mle_refs.refcount);
1876 		if (mle->inuse > 0) {
1877 			if (extra_ref && rr < 3)
1878 				err = 1;
1879 			else if (!extra_ref && rr < 2)
1880 				err = 1;
1881 		} else {
1882 			if (extra_ref && rr < 2)
1883 				err = 1;
1884 			else if (!extra_ref && rr < 1)
1885 				err = 1;
1886 		}
1887 		if (err) {
1888 			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "
1889 			     "that will mess up this node, refs=%d, extra=%d, "
1890 			     "inuse=%d\n", dlm->name, namelen, name,
1891 			     assert->node_idx, rr, extra_ref, mle->inuse);
1892 			dlm_print_one_mle(mle);
1893 		}
1894 		list_del_init(&mle->list);
1895 		__dlm_mle_detach_hb_events(dlm, mle);
1896 		__dlm_put_mle(mle);
1897 		if (extra_ref) {
1898 			/* the assert master message now balances the extra
1899 		 	 * ref given by the master / migration request message.
1900 		 	 * if this is the last put, it will be removed
1901 		 	 * from the list. */
1902 			__dlm_put_mle(mle);
1903 		}
1904 		spin_unlock(&dlm->master_lock);
1905 		spin_unlock(&dlm->spinlock);
1906 	} else if (res) {
1907 		if (res->owner != assert->node_idx) {
1908 			mlog(0, "assert_master from %u, but current "
1909 			     "owner is %u (%.*s), no mle\n", assert->node_idx,
1910 			     res->owner, namelen, name);
1911 		}
1912 	}
1913 
1914 done:
1915 	ret = 0;
1916 	if (res)
1917 		dlm_lockres_put(res);
1918 	dlm_put(dlm);
1919 	if (master_request) {
1920 		mlog(0, "need to tell master to reassert\n");
1921 		ret = EAGAIN;  // positive. negative would shoot down the node.
1922 	}
1923 	return ret;
1924 
1925 kill:
1926 	/* kill the caller! */
1927 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
1928 	     "and killing the other node now!  This node is OK and can continue.\n");
1929 	__dlm_print_one_lock_resource(res);
1930 	spin_unlock(&res->spinlock);
1931 	spin_unlock(&dlm->spinlock);
1932 	dlm_lockres_put(res);
1933 	dlm_put(dlm);
1934 	return -EINVAL;
1935 }
1936 
1937 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1938 			       struct dlm_lock_resource *res,
1939 			       int ignore_higher, u8 request_from, u32 flags)
1940 {
1941 	struct dlm_work_item *item;
1942 	item = kcalloc(1, sizeof(*item), GFP_NOFS);
1943 	if (!item)
1944 		return -ENOMEM;
1945 
1946 
1947 	/* queue up work for dlm_assert_master_worker */
1948 	dlm_grab(dlm);  /* get an extra ref for the work item */
1949 	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1950 	item->u.am.lockres = res; /* already have a ref */
1951 	/* can optionally ignore node numbers higher than this node */
1952 	item->u.am.ignore_higher = ignore_higher;
1953 	item->u.am.request_from = request_from;
1954 	item->u.am.flags = flags;
1955 
1956 	if (ignore_higher)
1957 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
1958 		     res->lockname.name);
1959 
1960 	spin_lock(&dlm->work_lock);
1961 	list_add_tail(&item->list, &dlm->work_list);
1962 	spin_unlock(&dlm->work_lock);
1963 
1964 	queue_work(dlm->dlm_worker, &dlm->dispatched_work);
1965 	return 0;
1966 }
1967 
1968 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1969 {
1970 	struct dlm_ctxt *dlm = data;
1971 	int ret = 0;
1972 	struct dlm_lock_resource *res;
1973 	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1974 	int ignore_higher;
1975 	int bit;
1976 	u8 request_from;
1977 	u32 flags;
1978 
1979 	dlm = item->dlm;
1980 	res = item->u.am.lockres;
1981 	ignore_higher = item->u.am.ignore_higher;
1982 	request_from = item->u.am.request_from;
1983 	flags = item->u.am.flags;
1984 
1985 	spin_lock(&dlm->spinlock);
1986 	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1987 	spin_unlock(&dlm->spinlock);
1988 
1989 	clear_bit(dlm->node_num, nodemap);
1990 	if (ignore_higher) {
1991 		/* if is this just to clear up mles for nodes below
1992 		 * this node, do not send the message to the original
1993 		 * caller or any node number higher than this */
1994 		clear_bit(request_from, nodemap);
1995 		bit = dlm->node_num;
1996 		while (1) {
1997 			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
1998 					    bit+1);
1999 		       	if (bit >= O2NM_MAX_NODES)
2000 				break;
2001 			clear_bit(bit, nodemap);
2002 		}
2003 	}
2004 
2005 	/*
2006 	 * If we're migrating this lock to someone else, we are no
2007 	 * longer allowed to assert out own mastery.  OTOH, we need to
2008 	 * prevent migration from starting while we're still asserting
2009 	 * our dominance.  The reserved ast delays migration.
2010 	 */
2011 	spin_lock(&res->spinlock);
2012 	if (res->state & DLM_LOCK_RES_MIGRATING) {
2013 		mlog(0, "Someone asked us to assert mastery, but we're "
2014 		     "in the middle of migration.  Skipping assert, "
2015 		     "the new master will handle that.\n");
2016 		spin_unlock(&res->spinlock);
2017 		goto put;
2018 	} else
2019 		__dlm_lockres_reserve_ast(res);
2020 	spin_unlock(&res->spinlock);
2021 
2022 	/* this call now finishes out the nodemap
2023 	 * even if one or more nodes die */
2024 	mlog(0, "worker about to master %.*s here, this=%u\n",
2025 		     res->lockname.len, res->lockname.name, dlm->node_num);
2026 	ret = dlm_do_assert_master(dlm, res->lockname.name,
2027 				   res->lockname.len,
2028 				   nodemap, flags);
2029 	if (ret < 0) {
2030 		/* no need to restart, we are done */
2031 		if (!dlm_is_host_down(ret))
2032 			mlog_errno(ret);
2033 	}
2034 
2035 	/* Ok, we've asserted ourselves.  Let's let migration start. */
2036 	dlm_lockres_release_ast(dlm, res);
2037 
2038 put:
2039 	dlm_lockres_put(res);
2040 
2041 	mlog(0, "finished with dlm_assert_master_worker\n");
2042 }
2043 
2044 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
2045  * We cannot wait for node recovery to complete to begin mastering this
2046  * lockres because this lockres is used to kick off recovery! ;-)
2047  * So, do a pre-check on all living nodes to see if any of those nodes
2048  * think that $RECOVERY is currently mastered by a dead node.  If so,
2049  * we wait a short time to allow that node to get notified by its own
2050  * heartbeat stack, then check again.  All $RECOVERY lock resources
2051  * mastered by dead nodes are purged when the hearbeat callback is
2052  * fired, so we can know for sure that it is safe to continue once
2053  * the node returns a live node or no node.  */
2054 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
2055 				       struct dlm_lock_resource *res)
2056 {
2057 	struct dlm_node_iter iter;
2058 	int nodenum;
2059 	int ret = 0;
2060 	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
2061 
2062 	spin_lock(&dlm->spinlock);
2063 	dlm_node_iter_init(dlm->domain_map, &iter);
2064 	spin_unlock(&dlm->spinlock);
2065 
2066 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
2067 		/* do not send to self */
2068 		if (nodenum == dlm->node_num)
2069 			continue;
2070 		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
2071 		if (ret < 0) {
2072 			mlog_errno(ret);
2073 			if (!dlm_is_host_down(ret))
2074 				BUG();
2075 			/* host is down, so answer for that node would be
2076 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
2077 			ret = 0;
2078 		}
2079 
2080 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
2081 			/* check to see if this master is in the recovery map */
2082 			spin_lock(&dlm->spinlock);
2083 			if (test_bit(master, dlm->recovery_map)) {
2084 				mlog(ML_NOTICE, "%s: node %u has not seen "
2085 				     "node %u go down yet, and thinks the "
2086 				     "dead node is mastering the recovery "
2087 				     "lock.  must wait.\n", dlm->name,
2088 				     nodenum, master);
2089 				ret = -EAGAIN;
2090 			}
2091 			spin_unlock(&dlm->spinlock);
2092 			mlog(0, "%s: reco lock master is %u\n", dlm->name,
2093 			     master);
2094 			break;
2095 		}
2096 	}
2097 	return ret;
2098 }
2099 
2100 
2101 /*
2102  * DLM_MIGRATE_LOCKRES
2103  */
2104 
2105 
2106 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2107 			u8 target)
2108 {
2109 	struct dlm_master_list_entry *mle = NULL;
2110 	struct dlm_master_list_entry *oldmle = NULL;
2111  	struct dlm_migratable_lockres *mres = NULL;
2112 	int ret = -EINVAL;
2113 	const char *name;
2114 	unsigned int namelen;
2115 	int mle_added = 0;
2116 	struct list_head *queue, *iter;
2117 	int i;
2118 	struct dlm_lock *lock;
2119 	int empty = 1;
2120 
2121 	if (!dlm_grab(dlm))
2122 		return -EINVAL;
2123 
2124 	name = res->lockname.name;
2125 	namelen = res->lockname.len;
2126 
2127 	mlog(0, "migrating %.*s to %u\n", namelen, name, target);
2128 
2129 	/*
2130 	 * ensure this lockres is a proper candidate for migration
2131 	 */
2132 	spin_lock(&res->spinlock);
2133 	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
2134 		mlog(0, "cannot migrate lockres with unknown owner!\n");
2135 		spin_unlock(&res->spinlock);
2136 		goto leave;
2137 	}
2138 	if (res->owner != dlm->node_num) {
2139 		mlog(0, "cannot migrate lockres this node doesn't own!\n");
2140 		spin_unlock(&res->spinlock);
2141 		goto leave;
2142 	}
2143 	mlog(0, "checking queues...\n");
2144 	queue = &res->granted;
2145 	for (i=0; i<3; i++) {
2146 		list_for_each(iter, queue) {
2147 			lock = list_entry (iter, struct dlm_lock, list);
2148 			empty = 0;
2149 			if (lock->ml.node == dlm->node_num) {
2150 				mlog(0, "found a lock owned by this node "
2151 				     "still on the %s queue!  will not "
2152 				     "migrate this lockres\n",
2153 				     i==0 ? "granted" :
2154 				     (i==1 ? "converting" : "blocked"));
2155 				spin_unlock(&res->spinlock);
2156 				ret = -ENOTEMPTY;
2157 				goto leave;
2158 			}
2159 		}
2160 		queue++;
2161 	}
2162 	mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
2163 	spin_unlock(&res->spinlock);
2164 
2165 	/* no work to do */
2166 	if (empty) {
2167 		mlog(0, "no locks were found on this lockres! done!\n");
2168 		ret = 0;
2169 		goto leave;
2170 	}
2171 
2172 	/*
2173 	 * preallocate up front
2174 	 * if this fails, abort
2175 	 */
2176 
2177 	ret = -ENOMEM;
2178 	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_NOFS);
2179 	if (!mres) {
2180 		mlog_errno(ret);
2181 		goto leave;
2182 	}
2183 
2184 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2185 								GFP_NOFS);
2186 	if (!mle) {
2187 		mlog_errno(ret);
2188 		goto leave;
2189 	}
2190 	ret = 0;
2191 
2192 	/*
2193 	 * find a node to migrate the lockres to
2194 	 */
2195 
2196 	mlog(0, "picking a migration node\n");
2197 	spin_lock(&dlm->spinlock);
2198 	/* pick a new node */
2199 	if (!test_bit(target, dlm->domain_map) ||
2200 	    target >= O2NM_MAX_NODES) {
2201 		target = dlm_pick_migration_target(dlm, res);
2202 	}
2203 	mlog(0, "node %u chosen for migration\n", target);
2204 
2205 	if (target >= O2NM_MAX_NODES ||
2206 	    !test_bit(target, dlm->domain_map)) {
2207 		/* target chosen is not alive */
2208 		ret = -EINVAL;
2209 	}
2210 
2211 	if (ret) {
2212 		spin_unlock(&dlm->spinlock);
2213 		goto fail;
2214 	}
2215 
2216 	mlog(0, "continuing with target = %u\n", target);
2217 
2218 	/*
2219 	 * clear any existing master requests and
2220 	 * add the migration mle to the list
2221 	 */
2222 	spin_lock(&dlm->master_lock);
2223 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2224 				    namelen, target, dlm->node_num);
2225 	spin_unlock(&dlm->master_lock);
2226 	spin_unlock(&dlm->spinlock);
2227 
2228 	if (ret == -EEXIST) {
2229 		mlog(0, "another process is already migrating it\n");
2230 		goto fail;
2231 	}
2232 	mle_added = 1;
2233 
2234 	/*
2235 	 * set the MIGRATING flag and flush asts
2236 	 * if we fail after this we need to re-dirty the lockres
2237 	 */
2238 	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2239 		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2240 		     "the target went down.\n", res->lockname.len,
2241 		     res->lockname.name, target);
2242 		spin_lock(&res->spinlock);
2243 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2244 		spin_unlock(&res->spinlock);
2245 		ret = -EINVAL;
2246 	}
2247 
2248 fail:
2249 	if (oldmle) {
2250 		/* master is known, detach if not already detached */
2251 		dlm_mle_detach_hb_events(dlm, oldmle);
2252 		dlm_put_mle(oldmle);
2253 	}
2254 
2255 	if (ret < 0) {
2256 		if (mle_added) {
2257 			dlm_mle_detach_hb_events(dlm, mle);
2258 			dlm_put_mle(mle);
2259 		} else if (mle) {
2260 			kmem_cache_free(dlm_mle_cache, mle);
2261 		}
2262 		goto leave;
2263 	}
2264 
2265 	/*
2266 	 * at this point, we have a migration target, an mle
2267 	 * in the master list, and the MIGRATING flag set on
2268 	 * the lockres
2269 	 */
2270 
2271 
2272 	/* get an extra reference on the mle.
2273 	 * otherwise the assert_master from the new
2274 	 * master will destroy this.
2275 	 * also, make sure that all callers of dlm_get_mle
2276 	 * take both dlm->spinlock and dlm->master_lock */
2277 	spin_lock(&dlm->spinlock);
2278 	spin_lock(&dlm->master_lock);
2279 	dlm_get_mle_inuse(mle);
2280 	spin_unlock(&dlm->master_lock);
2281 	spin_unlock(&dlm->spinlock);
2282 
2283 	/* notify new node and send all lock state */
2284 	/* call send_one_lockres with migration flag.
2285 	 * this serves as notice to the target node that a
2286 	 * migration is starting. */
2287 	ret = dlm_send_one_lockres(dlm, res, mres, target,
2288 				   DLM_MRES_MIGRATION);
2289 
2290 	if (ret < 0) {
2291 		mlog(0, "migration to node %u failed with %d\n",
2292 		     target, ret);
2293 		/* migration failed, detach and clean up mle */
2294 		dlm_mle_detach_hb_events(dlm, mle);
2295 		dlm_put_mle(mle);
2296 		dlm_put_mle_inuse(mle);
2297 		spin_lock(&res->spinlock);
2298 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2299 		spin_unlock(&res->spinlock);
2300 		goto leave;
2301 	}
2302 
2303 	/* at this point, the target sends a message to all nodes,
2304 	 * (using dlm_do_migrate_request).  this node is skipped since
2305 	 * we had to put an mle in the list to begin the process.  this
2306 	 * node now waits for target to do an assert master.  this node
2307 	 * will be the last one notified, ensuring that the migration
2308 	 * is complete everywhere.  if the target dies while this is
2309 	 * going on, some nodes could potentially see the target as the
2310 	 * master, so it is important that my recovery finds the migration
2311 	 * mle and sets the master to UNKNONWN. */
2312 
2313 
2314 	/* wait for new node to assert master */
2315 	while (1) {
2316 		ret = wait_event_interruptible_timeout(mle->wq,
2317 					(atomic_read(&mle->woken) == 1),
2318 					msecs_to_jiffies(5000));
2319 
2320 		if (ret >= 0) {
2321 		       	if (atomic_read(&mle->woken) == 1 ||
2322 			    res->owner == target)
2323 				break;
2324 
2325 			mlog(0, "timed out during migration\n");
2326 			/* avoid hang during shutdown when migrating lockres
2327 			 * to a node which also goes down */
2328 			if (dlm_is_node_dead(dlm, target)) {
2329 				mlog(0, "%s:%.*s: expected migration "
2330 				     "target %u is no longer up, restarting\n",
2331 				     dlm->name, res->lockname.len,
2332 				     res->lockname.name, target);
2333 				ret = -ERESTARTSYS;
2334 			}
2335 		}
2336 		if (ret == -ERESTARTSYS) {
2337 			/* migration failed, detach and clean up mle */
2338 			dlm_mle_detach_hb_events(dlm, mle);
2339 			dlm_put_mle(mle);
2340 			dlm_put_mle_inuse(mle);
2341 			spin_lock(&res->spinlock);
2342 			res->state &= ~DLM_LOCK_RES_MIGRATING;
2343 			spin_unlock(&res->spinlock);
2344 			goto leave;
2345 		}
2346 		/* TODO: if node died: stop, clean up, return error */
2347 	}
2348 
2349 	/* all done, set the owner, clear the flag */
2350 	spin_lock(&res->spinlock);
2351 	dlm_set_lockres_owner(dlm, res, target);
2352 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2353 	dlm_remove_nonlocal_locks(dlm, res);
2354 	spin_unlock(&res->spinlock);
2355 	wake_up(&res->wq);
2356 
2357 	/* master is known, detach if not already detached */
2358 	dlm_mle_detach_hb_events(dlm, mle);
2359 	dlm_put_mle_inuse(mle);
2360 	ret = 0;
2361 
2362 	dlm_lockres_calc_usage(dlm, res);
2363 
2364 leave:
2365 	/* re-dirty the lockres if we failed */
2366 	if (ret < 0)
2367 		dlm_kick_thread(dlm, res);
2368 
2369 	/* TODO: cleanup */
2370 	if (mres)
2371 		free_page((unsigned long)mres);
2372 
2373 	dlm_put(dlm);
2374 
2375 	mlog(0, "returning %d\n", ret);
2376 	return ret;
2377 }
2378 
2379 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2380 {
2381 	int ret;
2382 	spin_lock(&dlm->ast_lock);
2383 	spin_lock(&lock->spinlock);
2384 	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2385 	spin_unlock(&lock->spinlock);
2386 	spin_unlock(&dlm->ast_lock);
2387 	return ret;
2388 }
2389 
2390 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2391 				     struct dlm_lock_resource *res,
2392 				     u8 mig_target)
2393 {
2394 	int can_proceed;
2395 	spin_lock(&res->spinlock);
2396 	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2397 	spin_unlock(&res->spinlock);
2398 
2399 	/* target has died, so make the caller break out of the
2400 	 * wait_event, but caller must recheck the domain_map */
2401 	spin_lock(&dlm->spinlock);
2402 	if (!test_bit(mig_target, dlm->domain_map))
2403 		can_proceed = 1;
2404 	spin_unlock(&dlm->spinlock);
2405 	return can_proceed;
2406 }
2407 
2408 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2409 {
2410 	int ret;
2411 	spin_lock(&res->spinlock);
2412 	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2413 	spin_unlock(&res->spinlock);
2414 	return ret;
2415 }
2416 
2417 
2418 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2419 				       struct dlm_lock_resource *res,
2420 				       u8 target)
2421 {
2422 	int ret = 0;
2423 
2424 	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2425 	       res->lockname.len, res->lockname.name, dlm->node_num,
2426 	       target);
2427 	/* need to set MIGRATING flag on lockres.  this is done by
2428 	 * ensuring that all asts have been flushed for this lockres. */
2429 	spin_lock(&res->spinlock);
2430 	BUG_ON(res->migration_pending);
2431 	res->migration_pending = 1;
2432 	/* strategy is to reserve an extra ast then release
2433 	 * it below, letting the release do all of the work */
2434 	__dlm_lockres_reserve_ast(res);
2435 	spin_unlock(&res->spinlock);
2436 
2437 	/* now flush all the pending asts.. hang out for a bit */
2438 	dlm_kick_thread(dlm, res);
2439 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2440 	dlm_lockres_release_ast(dlm, res);
2441 
2442 	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2443 	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2444 	/* if the extra ref we just put was the final one, this
2445 	 * will pass thru immediately.  otherwise, we need to wait
2446 	 * for the last ast to finish. */
2447 again:
2448 	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2449 		   dlm_migration_can_proceed(dlm, res, target),
2450 		   msecs_to_jiffies(1000));
2451 	if (ret < 0) {
2452 		mlog(0, "woken again: migrating? %s, dead? %s\n",
2453 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2454 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2455 	} else {
2456 		mlog(0, "all is well: migrating? %s, dead? %s\n",
2457 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2458 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2459 	}
2460 	if (!dlm_migration_can_proceed(dlm, res, target)) {
2461 		mlog(0, "trying again...\n");
2462 		goto again;
2463 	}
2464 
2465 	/* did the target go down or die? */
2466 	spin_lock(&dlm->spinlock);
2467 	if (!test_bit(target, dlm->domain_map)) {
2468 		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2469 		     target);
2470 		ret = -EHOSTDOWN;
2471 	}
2472 	spin_unlock(&dlm->spinlock);
2473 
2474 	/*
2475 	 * at this point:
2476 	 *
2477 	 *   o the DLM_LOCK_RES_MIGRATING flag is set
2478 	 *   o there are no pending asts on this lockres
2479 	 *   o all processes trying to reserve an ast on this
2480 	 *     lockres must wait for the MIGRATING flag to clear
2481 	 */
2482 	return ret;
2483 }
2484 
2485 /* last step in the migration process.
2486  * original master calls this to free all of the dlm_lock
2487  * structures that used to be for other nodes. */
2488 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2489 				      struct dlm_lock_resource *res)
2490 {
2491 	struct list_head *iter, *iter2;
2492 	struct list_head *queue = &res->granted;
2493 	int i;
2494 	struct dlm_lock *lock;
2495 
2496 	assert_spin_locked(&res->spinlock);
2497 
2498 	BUG_ON(res->owner == dlm->node_num);
2499 
2500 	for (i=0; i<3; i++) {
2501 		list_for_each_safe(iter, iter2, queue) {
2502 			lock = list_entry (iter, struct dlm_lock, list);
2503 			if (lock->ml.node != dlm->node_num) {
2504 				mlog(0, "putting lock for node %u\n",
2505 				     lock->ml.node);
2506 				/* be extra careful */
2507 				BUG_ON(!list_empty(&lock->ast_list));
2508 				BUG_ON(!list_empty(&lock->bast_list));
2509 				BUG_ON(lock->ast_pending);
2510 				BUG_ON(lock->bast_pending);
2511 				list_del_init(&lock->list);
2512 				dlm_lock_put(lock);
2513 			}
2514 		}
2515 		queue++;
2516 	}
2517 }
2518 
2519 /* for now this is not too intelligent.  we will
2520  * need stats to make this do the right thing.
2521  * this just finds the first lock on one of the
2522  * queues and uses that node as the target. */
2523 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2524 				    struct dlm_lock_resource *res)
2525 {
2526 	int i;
2527 	struct list_head *queue = &res->granted;
2528 	struct list_head *iter;
2529 	struct dlm_lock *lock;
2530 	int nodenum;
2531 
2532 	assert_spin_locked(&dlm->spinlock);
2533 
2534 	spin_lock(&res->spinlock);
2535 	for (i=0; i<3; i++) {
2536 		list_for_each(iter, queue) {
2537 			/* up to the caller to make sure this node
2538 			 * is alive */
2539 			lock = list_entry (iter, struct dlm_lock, list);
2540 			if (lock->ml.node != dlm->node_num) {
2541 				spin_unlock(&res->spinlock);
2542 				return lock->ml.node;
2543 			}
2544 		}
2545 		queue++;
2546 	}
2547 	spin_unlock(&res->spinlock);
2548 	mlog(0, "have not found a suitable target yet! checking domain map\n");
2549 
2550 	/* ok now we're getting desperate.  pick anyone alive. */
2551 	nodenum = -1;
2552 	while (1) {
2553 		nodenum = find_next_bit(dlm->domain_map,
2554 					O2NM_MAX_NODES, nodenum+1);
2555 		mlog(0, "found %d in domain map\n", nodenum);
2556 		if (nodenum >= O2NM_MAX_NODES)
2557 			break;
2558 		if (nodenum != dlm->node_num) {
2559 			mlog(0, "picking %d\n", nodenum);
2560 			return nodenum;
2561 		}
2562 	}
2563 
2564 	mlog(0, "giving up.  no master to migrate to\n");
2565 	return DLM_LOCK_RES_OWNER_UNKNOWN;
2566 }
2567 
2568 
2569 
2570 /* this is called by the new master once all lockres
2571  * data has been received */
2572 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2573 				  struct dlm_lock_resource *res,
2574 				  u8 master, u8 new_master,
2575 				  struct dlm_node_iter *iter)
2576 {
2577 	struct dlm_migrate_request migrate;
2578 	int ret, status = 0;
2579 	int nodenum;
2580 
2581 	memset(&migrate, 0, sizeof(migrate));
2582 	migrate.namelen = res->lockname.len;
2583 	memcpy(migrate.name, res->lockname.name, migrate.namelen);
2584 	migrate.new_master = new_master;
2585 	migrate.master = master;
2586 
2587 	ret = 0;
2588 
2589 	/* send message to all nodes, except the master and myself */
2590 	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2591 		if (nodenum == master ||
2592 		    nodenum == new_master)
2593 			continue;
2594 
2595 		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2596 					 &migrate, sizeof(migrate), nodenum,
2597 					 &status);
2598 		if (ret < 0)
2599 			mlog_errno(ret);
2600 		else if (status < 0) {
2601 			mlog(0, "migrate request (node %u) returned %d!\n",
2602 			     nodenum, status);
2603 			ret = status;
2604 		}
2605 	}
2606 
2607 	if (ret < 0)
2608 		mlog_errno(ret);
2609 
2610 	mlog(0, "returning ret=%d\n", ret);
2611 	return ret;
2612 }
2613 
2614 
2615 /* if there is an existing mle for this lockres, we now know who the master is.
2616  * (the one who sent us *this* message) we can clear it up right away.
2617  * since the process that put the mle on the list still has a reference to it,
2618  * we can unhash it now, set the master and wake the process.  as a result,
2619  * we will have no mle in the list to start with.  now we can add an mle for
2620  * the migration and this should be the only one found for those scanning the
2621  * list.  */
2622 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2623 {
2624 	struct dlm_ctxt *dlm = data;
2625 	struct dlm_lock_resource *res = NULL;
2626 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2627 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2628 	const char *name;
2629 	unsigned int namelen, hash;
2630 	int ret = 0;
2631 
2632 	if (!dlm_grab(dlm))
2633 		return -EINVAL;
2634 
2635 	name = migrate->name;
2636 	namelen = migrate->namelen;
2637 	hash = dlm_lockid_hash(name, namelen);
2638 
2639 	/* preallocate.. if this fails, abort */
2640 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2641 							 GFP_NOFS);
2642 
2643 	if (!mle) {
2644 		ret = -ENOMEM;
2645 		goto leave;
2646 	}
2647 
2648 	/* check for pre-existing lock */
2649 	spin_lock(&dlm->spinlock);
2650 	res = __dlm_lookup_lockres(dlm, name, namelen, hash);
2651 	spin_lock(&dlm->master_lock);
2652 
2653 	if (res) {
2654 		spin_lock(&res->spinlock);
2655 		if (res->state & DLM_LOCK_RES_RECOVERING) {
2656 			/* if all is working ok, this can only mean that we got
2657 		 	* a migrate request from a node that we now see as
2658 		 	* dead.  what can we do here?  drop it to the floor? */
2659 			spin_unlock(&res->spinlock);
2660 			mlog(ML_ERROR, "Got a migrate request, but the "
2661 			     "lockres is marked as recovering!");
2662 			kmem_cache_free(dlm_mle_cache, mle);
2663 			ret = -EINVAL; /* need a better solution */
2664 			goto unlock;
2665 		}
2666 		res->state |= DLM_LOCK_RES_MIGRATING;
2667 		spin_unlock(&res->spinlock);
2668 	}
2669 
2670 	/* ignore status.  only nonzero status would BUG. */
2671 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2672 				    name, namelen,
2673 				    migrate->new_master,
2674 				    migrate->master);
2675 
2676 unlock:
2677 	spin_unlock(&dlm->master_lock);
2678 	spin_unlock(&dlm->spinlock);
2679 
2680 	if (oldmle) {
2681 		/* master is known, detach if not already detached */
2682 		dlm_mle_detach_hb_events(dlm, oldmle);
2683 		dlm_put_mle(oldmle);
2684 	}
2685 
2686 	if (res)
2687 		dlm_lockres_put(res);
2688 leave:
2689 	dlm_put(dlm);
2690 	return ret;
2691 }
2692 
2693 /* must be holding dlm->spinlock and dlm->master_lock
2694  * when adding a migration mle, we can clear any other mles
2695  * in the master list because we know with certainty that
2696  * the master is "master".  so we remove any old mle from
2697  * the list after setting it's master field, and then add
2698  * the new migration mle.  this way we can hold with the rule
2699  * of having only one mle for a given lock name at all times. */
2700 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2701 				 struct dlm_lock_resource *res,
2702 				 struct dlm_master_list_entry *mle,
2703 				 struct dlm_master_list_entry **oldmle,
2704 				 const char *name, unsigned int namelen,
2705 				 u8 new_master, u8 master)
2706 {
2707 	int found;
2708 	int ret = 0;
2709 
2710 	*oldmle = NULL;
2711 
2712 	mlog_entry_void();
2713 
2714 	assert_spin_locked(&dlm->spinlock);
2715 	assert_spin_locked(&dlm->master_lock);
2716 
2717 	/* caller is responsible for any ref taken here on oldmle */
2718 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2719 	if (found) {
2720 		struct dlm_master_list_entry *tmp = *oldmle;
2721 		spin_lock(&tmp->spinlock);
2722 		if (tmp->type == DLM_MLE_MIGRATION) {
2723 			if (master == dlm->node_num) {
2724 				/* ah another process raced me to it */
2725 				mlog(0, "tried to migrate %.*s, but some "
2726 				     "process beat me to it\n",
2727 				     namelen, name);
2728 				ret = -EEXIST;
2729 			} else {
2730 				/* bad.  2 NODES are trying to migrate! */
2731 				mlog(ML_ERROR, "migration error  mle: "
2732 				     "master=%u new_master=%u // request: "
2733 				     "master=%u new_master=%u // "
2734 				     "lockres=%.*s\n",
2735 				     tmp->master, tmp->new_master,
2736 				     master, new_master,
2737 				     namelen, name);
2738 				BUG();
2739 			}
2740 		} else {
2741 			/* this is essentially what assert_master does */
2742 			tmp->master = master;
2743 			atomic_set(&tmp->woken, 1);
2744 			wake_up(&tmp->wq);
2745 			/* remove it from the list so that only one
2746 			 * mle will be found */
2747 			list_del_init(&tmp->list);
2748 			__dlm_mle_detach_hb_events(dlm, mle);
2749 		}
2750 		spin_unlock(&tmp->spinlock);
2751 	}
2752 
2753 	/* now add a migration mle to the tail of the list */
2754 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2755 	mle->new_master = new_master;
2756 	mle->master = master;
2757 	/* do this for consistency with other mle types */
2758 	set_bit(new_master, mle->maybe_map);
2759 	list_add(&mle->list, &dlm->master_list);
2760 
2761 	return ret;
2762 }
2763 
2764 
2765 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2766 {
2767 	struct list_head *iter, *iter2;
2768 	struct dlm_master_list_entry *mle;
2769 	struct dlm_lock_resource *res;
2770 	unsigned int hash;
2771 
2772 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2773 top:
2774 	assert_spin_locked(&dlm->spinlock);
2775 
2776 	/* clean the master list */
2777 	spin_lock(&dlm->master_lock);
2778 	list_for_each_safe(iter, iter2, &dlm->master_list) {
2779 		mle = list_entry(iter, struct dlm_master_list_entry, list);
2780 
2781 		BUG_ON(mle->type != DLM_MLE_BLOCK &&
2782 		       mle->type != DLM_MLE_MASTER &&
2783 		       mle->type != DLM_MLE_MIGRATION);
2784 
2785 		/* MASTER mles are initiated locally.  the waiting
2786 		 * process will notice the node map change
2787 		 * shortly.  let that happen as normal. */
2788 		if (mle->type == DLM_MLE_MASTER)
2789 			continue;
2790 
2791 
2792 		/* BLOCK mles are initiated by other nodes.
2793 		 * need to clean up if the dead node would have
2794 		 * been the master. */
2795 		if (mle->type == DLM_MLE_BLOCK) {
2796 			int bit;
2797 
2798 			spin_lock(&mle->spinlock);
2799 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2800 			if (bit != dead_node) {
2801 				mlog(0, "mle found, but dead node %u would "
2802 				     "not have been master\n", dead_node);
2803 				spin_unlock(&mle->spinlock);
2804 			} else {
2805 				/* must drop the refcount by one since the
2806 				 * assert_master will never arrive.  this
2807 				 * may result in the mle being unlinked and
2808 				 * freed, but there may still be a process
2809 				 * waiting in the dlmlock path which is fine. */
2810 				mlog(0, "node %u was expected master\n",
2811 				     dead_node);
2812 				atomic_set(&mle->woken, 1);
2813 				spin_unlock(&mle->spinlock);
2814 				wake_up(&mle->wq);
2815 				/* do not need events any longer, so detach
2816 				 * from heartbeat */
2817 				__dlm_mle_detach_hb_events(dlm, mle);
2818 				__dlm_put_mle(mle);
2819 			}
2820 			continue;
2821 		}
2822 
2823 		/* everything else is a MIGRATION mle */
2824 
2825 		/* the rule for MIGRATION mles is that the master
2826 		 * becomes UNKNOWN if *either* the original or
2827 		 * the new master dies.  all UNKNOWN lockreses
2828 		 * are sent to whichever node becomes the recovery
2829 		 * master.  the new master is responsible for
2830 		 * determining if there is still a master for
2831 		 * this lockres, or if he needs to take over
2832 		 * mastery.  either way, this node should expect
2833 		 * another message to resolve this. */
2834 		if (mle->master != dead_node &&
2835 		    mle->new_master != dead_node)
2836 			continue;
2837 
2838 		/* if we have reached this point, this mle needs to
2839 		 * be removed from the list and freed. */
2840 
2841 		/* remove from the list early.  NOTE: unlinking
2842 		 * list_head while in list_for_each_safe */
2843 		__dlm_mle_detach_hb_events(dlm, mle);
2844 		spin_lock(&mle->spinlock);
2845 		list_del_init(&mle->list);
2846 		atomic_set(&mle->woken, 1);
2847 		spin_unlock(&mle->spinlock);
2848 		wake_up(&mle->wq);
2849 
2850 		mlog(0, "%s: node %u died during migration from "
2851 		     "%u to %u!\n", dlm->name, dead_node,
2852 		     mle->master, mle->new_master);
2853 		/* if there is a lockres associated with this
2854 	 	 * mle, find it and set its owner to UNKNOWN */
2855 		hash = dlm_lockid_hash(mle->u.name.name, mle->u.name.len);
2856 		res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2857 					   mle->u.name.len, hash);
2858 		if (res) {
2859 			/* unfortunately if we hit this rare case, our
2860 		 	 * lock ordering is messed.  we need to drop
2861 		 	 * the master lock so that we can take the
2862 		  	 * lockres lock, meaning that we will have to
2863 			 * restart from the head of list. */
2864 			spin_unlock(&dlm->master_lock);
2865 
2866 			/* move lockres onto recovery list */
2867 			spin_lock(&res->spinlock);
2868 			dlm_set_lockres_owner(dlm, res,
2869 				      	DLM_LOCK_RES_OWNER_UNKNOWN);
2870 			dlm_move_lockres_to_recovery_list(dlm, res);
2871 			spin_unlock(&res->spinlock);
2872 			dlm_lockres_put(res);
2873 
2874 			/* about to get rid of mle, detach from heartbeat */
2875 			__dlm_mle_detach_hb_events(dlm, mle);
2876 
2877 			/* dump the mle */
2878 			spin_lock(&dlm->master_lock);
2879 			__dlm_put_mle(mle);
2880 			spin_unlock(&dlm->master_lock);
2881 
2882 			/* restart */
2883 			goto top;
2884 		}
2885 
2886 		/* this may be the last reference */
2887 		__dlm_put_mle(mle);
2888 	}
2889 	spin_unlock(&dlm->master_lock);
2890 }
2891 
2892 
2893 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2894 			 u8 old_master)
2895 {
2896 	struct dlm_node_iter iter;
2897 	int ret = 0;
2898 
2899 	spin_lock(&dlm->spinlock);
2900 	dlm_node_iter_init(dlm->domain_map, &iter);
2901 	clear_bit(old_master, iter.node_map);
2902 	clear_bit(dlm->node_num, iter.node_map);
2903 	spin_unlock(&dlm->spinlock);
2904 
2905 	mlog(0, "now time to do a migrate request to other nodes\n");
2906 	ret = dlm_do_migrate_request(dlm, res, old_master,
2907 				     dlm->node_num, &iter);
2908 	if (ret < 0) {
2909 		mlog_errno(ret);
2910 		goto leave;
2911 	}
2912 
2913 	mlog(0, "doing assert master of %.*s to all except the original node\n",
2914 	     res->lockname.len, res->lockname.name);
2915 	/* this call now finishes out the nodemap
2916 	 * even if one or more nodes die */
2917 	ret = dlm_do_assert_master(dlm, res->lockname.name,
2918 				   res->lockname.len, iter.node_map,
2919 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
2920 	if (ret < 0) {
2921 		/* no longer need to retry.  all living nodes contacted. */
2922 		mlog_errno(ret);
2923 		ret = 0;
2924 	}
2925 
2926 	memset(iter.node_map, 0, sizeof(iter.node_map));
2927 	set_bit(old_master, iter.node_map);
2928 	mlog(0, "doing assert master of %.*s back to %u\n",
2929 	     res->lockname.len, res->lockname.name, old_master);
2930 	ret = dlm_do_assert_master(dlm, res->lockname.name,
2931 				   res->lockname.len, iter.node_map,
2932 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
2933 	if (ret < 0) {
2934 		mlog(0, "assert master to original master failed "
2935 		     "with %d.\n", ret);
2936 		/* the only nonzero status here would be because of
2937 		 * a dead original node.  we're done. */
2938 		ret = 0;
2939 	}
2940 
2941 	/* all done, set the owner, clear the flag */
2942 	spin_lock(&res->spinlock);
2943 	dlm_set_lockres_owner(dlm, res, dlm->node_num);
2944 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2945 	spin_unlock(&res->spinlock);
2946 	/* re-dirty it on the new master */
2947 	dlm_kick_thread(dlm, res);
2948 	wake_up(&res->wq);
2949 leave:
2950 	return ret;
2951 }
2952 
2953 /*
2954  * LOCKRES AST REFCOUNT
2955  * this is integral to migration
2956  */
2957 
2958 /* for future intent to call an ast, reserve one ahead of time.
2959  * this should be called only after waiting on the lockres
2960  * with dlm_wait_on_lockres, and while still holding the
2961  * spinlock after the call. */
2962 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2963 {
2964 	assert_spin_locked(&res->spinlock);
2965 	if (res->state & DLM_LOCK_RES_MIGRATING) {
2966 		__dlm_print_one_lock_resource(res);
2967 	}
2968 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2969 
2970 	atomic_inc(&res->asts_reserved);
2971 }
2972 
2973 /*
2974  * used to drop the reserved ast, either because it went unused,
2975  * or because the ast/bast was actually called.
2976  *
2977  * also, if there is a pending migration on this lockres,
2978  * and this was the last pending ast on the lockres,
2979  * atomically set the MIGRATING flag before we drop the lock.
2980  * this is how we ensure that migration can proceed with no
2981  * asts in progress.  note that it is ok if the state of the
2982  * queues is such that a lock should be granted in the future
2983  * or that a bast should be fired, because the new master will
2984  * shuffle the lists on this lockres as soon as it is migrated.
2985  */
2986 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2987 			     struct dlm_lock_resource *res)
2988 {
2989 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2990 		return;
2991 
2992 	if (!res->migration_pending) {
2993 		spin_unlock(&res->spinlock);
2994 		return;
2995 	}
2996 
2997 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2998 	res->migration_pending = 0;
2999 	res->state |= DLM_LOCK_RES_MIGRATING;
3000 	spin_unlock(&res->spinlock);
3001 	wake_up(&res->wq);
3002 	wake_up(&dlm->migration_wq);
3003 }
3004