xref: /linux/fs/ocfs2/dlm/dlmmaster.c (revision f3d9478b2ce468c3115b02ecae7e975990697f15)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmmod.c
5  *
6  * standalone DLM module
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 
28 #include <linux/module.h>
29 #include <linux/fs.h>
30 #include <linux/types.h>
31 #include <linux/slab.h>
32 #include <linux/highmem.h>
33 #include <linux/utsname.h>
34 #include <linux/init.h>
35 #include <linux/sysctl.h>
36 #include <linux/random.h>
37 #include <linux/blkdev.h>
38 #include <linux/socket.h>
39 #include <linux/inet.h>
40 #include <linux/spinlock.h>
41 #include <linux/delay.h>
42 
43 
44 #include "cluster/heartbeat.h"
45 #include "cluster/nodemanager.h"
46 #include "cluster/tcp.h"
47 
48 #include "dlmapi.h"
49 #include "dlmcommon.h"
50 #include "dlmdebug.h"
51 #include "dlmdomain.h"
52 
53 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_MASTER)
54 #include "cluster/masklog.h"
55 
56 enum dlm_mle_type {
57 	DLM_MLE_BLOCK,
58 	DLM_MLE_MASTER,
59 	DLM_MLE_MIGRATION
60 };
61 
62 struct dlm_lock_name
63 {
64 	u8 len;
65 	u8 name[DLM_LOCKID_NAME_MAX];
66 };
67 
68 struct dlm_master_list_entry
69 {
70 	struct list_head list;
71 	struct list_head hb_events;
72 	struct dlm_ctxt *dlm;
73 	spinlock_t spinlock;
74 	wait_queue_head_t wq;
75 	atomic_t woken;
76 	struct kref mle_refs;
77 	unsigned long maybe_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
78 	unsigned long vote_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
79 	unsigned long response_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
80 	unsigned long node_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
81 	u8 master;
82 	u8 new_master;
83 	enum dlm_mle_type type;
84 	struct o2hb_callback_func mle_hb_up;
85 	struct o2hb_callback_func mle_hb_down;
86 	union {
87 		struct dlm_lock_resource *res;
88 		struct dlm_lock_name name;
89 	} u;
90 };
91 
92 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
93 			      struct dlm_master_list_entry *mle,
94 			      struct o2nm_node *node,
95 			      int idx);
96 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
97 			    struct dlm_master_list_entry *mle,
98 			    struct o2nm_node *node,
99 			    int idx);
100 
101 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data);
102 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
103 				unsigned int namelen, void *nodemap,
104 				u32 flags);
105 
106 static inline int dlm_mle_equal(struct dlm_ctxt *dlm,
107 				struct dlm_master_list_entry *mle,
108 				const char *name,
109 				unsigned int namelen)
110 {
111 	struct dlm_lock_resource *res;
112 
113 	if (dlm != mle->dlm)
114 		return 0;
115 
116 	if (mle->type == DLM_MLE_BLOCK ||
117 	    mle->type == DLM_MLE_MIGRATION) {
118 		if (namelen != mle->u.name.len ||
119     	    	    memcmp(name, mle->u.name.name, namelen)!=0)
120 			return 0;
121 	} else {
122 		res = mle->u.res;
123 		if (namelen != res->lockname.len ||
124 		    memcmp(res->lockname.name, name, namelen) != 0)
125 			return 0;
126 	}
127 	return 1;
128 }
129 
130 #if 0
131 /* Code here is included but defined out as it aids debugging */
132 
133 void dlm_print_one_mle(struct dlm_master_list_entry *mle)
134 {
135 	int i = 0, refs;
136 	char *type;
137 	char attached;
138 	u8 master;
139 	unsigned int namelen;
140 	const char *name;
141 	struct kref *k;
142 
143 	k = &mle->mle_refs;
144 	if (mle->type == DLM_MLE_BLOCK)
145 		type = "BLK";
146 	else if (mle->type == DLM_MLE_MASTER)
147 		type = "MAS";
148 	else
149 		type = "MIG";
150 	refs = atomic_read(&k->refcount);
151 	master = mle->master;
152 	attached = (list_empty(&mle->hb_events) ? 'N' : 'Y');
153 
154 	if (mle->type != DLM_MLE_MASTER) {
155 		namelen = mle->u.name.len;
156 		name = mle->u.name.name;
157 	} else {
158 		namelen = mle->u.res->lockname.len;
159 		name = mle->u.res->lockname.name;
160 	}
161 
162 	mlog(ML_NOTICE, "  #%3d: %3s  %3d  %3u   %3u %c    (%d)%.*s\n",
163 		  i, type, refs, master, mle->new_master, attached,
164 		  namelen, namelen, name);
165 }
166 
167 static void dlm_dump_mles(struct dlm_ctxt *dlm)
168 {
169 	struct dlm_master_list_entry *mle;
170 	struct list_head *iter;
171 
172 	mlog(ML_NOTICE, "dumping all mles for domain %s:\n", dlm->name);
173 	mlog(ML_NOTICE, "  ####: type refs owner new events? lockname nodemap votemap respmap maybemap\n");
174 	spin_lock(&dlm->master_lock);
175 	list_for_each(iter, &dlm->master_list) {
176 		mle = list_entry(iter, struct dlm_master_list_entry, list);
177 		dlm_print_one_mle(mle);
178 	}
179 	spin_unlock(&dlm->master_lock);
180 }
181 
182 int dlm_dump_all_mles(const char __user *data, unsigned int len)
183 {
184 	struct list_head *iter;
185 	struct dlm_ctxt *dlm;
186 
187 	spin_lock(&dlm_domain_lock);
188 	list_for_each(iter, &dlm_domains) {
189 		dlm = list_entry (iter, struct dlm_ctxt, list);
190 		mlog(ML_NOTICE, "found dlm: %p, name=%s\n", dlm, dlm->name);
191 		dlm_dump_mles(dlm);
192 	}
193 	spin_unlock(&dlm_domain_lock);
194 	return len;
195 }
196 EXPORT_SYMBOL_GPL(dlm_dump_all_mles);
197 
198 #endif  /*  0  */
199 
200 
201 static kmem_cache_t *dlm_mle_cache = NULL;
202 
203 
204 static void dlm_mle_release(struct kref *kref);
205 static void dlm_init_mle(struct dlm_master_list_entry *mle,
206 			enum dlm_mle_type type,
207 			struct dlm_ctxt *dlm,
208 			struct dlm_lock_resource *res,
209 			const char *name,
210 			unsigned int namelen);
211 static void dlm_put_mle(struct dlm_master_list_entry *mle);
212 static void __dlm_put_mle(struct dlm_master_list_entry *mle);
213 static int dlm_find_mle(struct dlm_ctxt *dlm,
214 			struct dlm_master_list_entry **mle,
215 			char *name, unsigned int namelen);
216 
217 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to);
218 
219 
220 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
221 				     struct dlm_lock_resource *res,
222 				     struct dlm_master_list_entry *mle,
223 				     int *blocked);
224 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
225 				    struct dlm_lock_resource *res,
226 				    struct dlm_master_list_entry *mle,
227 				    int blocked);
228 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
229 				 struct dlm_lock_resource *res,
230 				 struct dlm_master_list_entry *mle,
231 				 struct dlm_master_list_entry **oldmle,
232 				 const char *name, unsigned int namelen,
233 				 u8 new_master, u8 master);
234 
235 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
236 				    struct dlm_lock_resource *res);
237 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
238 				      struct dlm_lock_resource *res);
239 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
240 				       struct dlm_lock_resource *res,
241 				       u8 target);
242 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
243 				       struct dlm_lock_resource *res);
244 
245 
246 int dlm_is_host_down(int errno)
247 {
248 	switch (errno) {
249 		case -EBADF:
250 		case -ECONNREFUSED:
251 		case -ENOTCONN:
252 		case -ECONNRESET:
253 		case -EPIPE:
254 		case -EHOSTDOWN:
255 		case -EHOSTUNREACH:
256 		case -ETIMEDOUT:
257 		case -ECONNABORTED:
258 		case -ENETDOWN:
259 		case -ENETUNREACH:
260 		case -ENETRESET:
261 		case -ESHUTDOWN:
262 		case -ENOPROTOOPT:
263 		case -EINVAL:   /* if returned from our tcp code,
264 				   this means there is no socket */
265 			return 1;
266 	}
267 	return 0;
268 }
269 
270 
271 /*
272  * MASTER LIST FUNCTIONS
273  */
274 
275 
276 /*
277  * regarding master list entries and heartbeat callbacks:
278  *
279  * in order to avoid sleeping and allocation that occurs in
280  * heartbeat, master list entries are simply attached to the
281  * dlm's established heartbeat callbacks.  the mle is attached
282  * when it is created, and since the dlm->spinlock is held at
283  * that time, any heartbeat event will be properly discovered
284  * by the mle.  the mle needs to be detached from the
285  * dlm->mle_hb_events list as soon as heartbeat events are no
286  * longer useful to the mle, and before the mle is freed.
287  *
288  * as a general rule, heartbeat events are no longer needed by
289  * the mle once an "answer" regarding the lock master has been
290  * received.
291  */
292 static inline void __dlm_mle_attach_hb_events(struct dlm_ctxt *dlm,
293 					      struct dlm_master_list_entry *mle)
294 {
295 	assert_spin_locked(&dlm->spinlock);
296 
297 	list_add_tail(&mle->hb_events, &dlm->mle_hb_events);
298 }
299 
300 
301 static inline void __dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
302 					      struct dlm_master_list_entry *mle)
303 {
304 	if (!list_empty(&mle->hb_events))
305 		list_del_init(&mle->hb_events);
306 }
307 
308 
309 static inline void dlm_mle_detach_hb_events(struct dlm_ctxt *dlm,
310 					    struct dlm_master_list_entry *mle)
311 {
312 	spin_lock(&dlm->spinlock);
313 	__dlm_mle_detach_hb_events(dlm, mle);
314 	spin_unlock(&dlm->spinlock);
315 }
316 
317 /* remove from list and free */
318 static void __dlm_put_mle(struct dlm_master_list_entry *mle)
319 {
320 	struct dlm_ctxt *dlm;
321 	dlm = mle->dlm;
322 
323 	assert_spin_locked(&dlm->spinlock);
324 	assert_spin_locked(&dlm->master_lock);
325 	BUG_ON(!atomic_read(&mle->mle_refs.refcount));
326 
327 	kref_put(&mle->mle_refs, dlm_mle_release);
328 }
329 
330 
331 /* must not have any spinlocks coming in */
332 static void dlm_put_mle(struct dlm_master_list_entry *mle)
333 {
334 	struct dlm_ctxt *dlm;
335 	dlm = mle->dlm;
336 
337 	spin_lock(&dlm->spinlock);
338 	spin_lock(&dlm->master_lock);
339 	__dlm_put_mle(mle);
340 	spin_unlock(&dlm->master_lock);
341 	spin_unlock(&dlm->spinlock);
342 }
343 
344 static inline void dlm_get_mle(struct dlm_master_list_entry *mle)
345 {
346 	kref_get(&mle->mle_refs);
347 }
348 
349 static void dlm_init_mle(struct dlm_master_list_entry *mle,
350 			enum dlm_mle_type type,
351 			struct dlm_ctxt *dlm,
352 			struct dlm_lock_resource *res,
353 			const char *name,
354 			unsigned int namelen)
355 {
356 	assert_spin_locked(&dlm->spinlock);
357 
358 	mle->dlm = dlm;
359 	mle->type = type;
360 	INIT_LIST_HEAD(&mle->list);
361 	INIT_LIST_HEAD(&mle->hb_events);
362 	memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
363 	spin_lock_init(&mle->spinlock);
364 	init_waitqueue_head(&mle->wq);
365 	atomic_set(&mle->woken, 0);
366 	kref_init(&mle->mle_refs);
367 	memset(mle->response_map, 0, sizeof(mle->response_map));
368 	mle->master = O2NM_MAX_NODES;
369 	mle->new_master = O2NM_MAX_NODES;
370 
371 	if (mle->type == DLM_MLE_MASTER) {
372 		BUG_ON(!res);
373 		mle->u.res = res;
374 	} else if (mle->type == DLM_MLE_BLOCK) {
375 		BUG_ON(!name);
376 		memcpy(mle->u.name.name, name, namelen);
377 		mle->u.name.len = namelen;
378 	} else /* DLM_MLE_MIGRATION */ {
379 		BUG_ON(!name);
380 		memcpy(mle->u.name.name, name, namelen);
381 		mle->u.name.len = namelen;
382 	}
383 
384 	/* copy off the node_map and register hb callbacks on our copy */
385 	memcpy(mle->node_map, dlm->domain_map, sizeof(mle->node_map));
386 	memcpy(mle->vote_map, dlm->domain_map, sizeof(mle->vote_map));
387 	clear_bit(dlm->node_num, mle->vote_map);
388 	clear_bit(dlm->node_num, mle->node_map);
389 
390 	/* attach the mle to the domain node up/down events */
391 	__dlm_mle_attach_hb_events(dlm, mle);
392 }
393 
394 
395 /* returns 1 if found, 0 if not */
396 static int dlm_find_mle(struct dlm_ctxt *dlm,
397 			struct dlm_master_list_entry **mle,
398 			char *name, unsigned int namelen)
399 {
400 	struct dlm_master_list_entry *tmpmle;
401 	struct list_head *iter;
402 
403 	assert_spin_locked(&dlm->master_lock);
404 
405 	list_for_each(iter, &dlm->master_list) {
406 		tmpmle = list_entry(iter, struct dlm_master_list_entry, list);
407 		if (!dlm_mle_equal(dlm, tmpmle, name, namelen))
408 			continue;
409 		dlm_get_mle(tmpmle);
410 		*mle = tmpmle;
411 		return 1;
412 	}
413 	return 0;
414 }
415 
416 void dlm_hb_event_notify_attached(struct dlm_ctxt *dlm, int idx, int node_up)
417 {
418 	struct dlm_master_list_entry *mle;
419 	struct list_head *iter;
420 
421 	assert_spin_locked(&dlm->spinlock);
422 
423 	list_for_each(iter, &dlm->mle_hb_events) {
424 		mle = list_entry(iter, struct dlm_master_list_entry,
425 				 hb_events);
426 		if (node_up)
427 			dlm_mle_node_up(dlm, mle, NULL, idx);
428 		else
429 			dlm_mle_node_down(dlm, mle, NULL, idx);
430 	}
431 }
432 
433 static void dlm_mle_node_down(struct dlm_ctxt *dlm,
434 			      struct dlm_master_list_entry *mle,
435 			      struct o2nm_node *node, int idx)
436 {
437 	spin_lock(&mle->spinlock);
438 
439 	if (!test_bit(idx, mle->node_map))
440 		mlog(0, "node %u already removed from nodemap!\n", idx);
441 	else
442 		clear_bit(idx, mle->node_map);
443 
444 	spin_unlock(&mle->spinlock);
445 }
446 
447 static void dlm_mle_node_up(struct dlm_ctxt *dlm,
448 			    struct dlm_master_list_entry *mle,
449 			    struct o2nm_node *node, int idx)
450 {
451 	spin_lock(&mle->spinlock);
452 
453 	if (test_bit(idx, mle->node_map))
454 		mlog(0, "node %u already in node map!\n", idx);
455 	else
456 		set_bit(idx, mle->node_map);
457 
458 	spin_unlock(&mle->spinlock);
459 }
460 
461 
462 int dlm_init_mle_cache(void)
463 {
464 	dlm_mle_cache = kmem_cache_create("dlm_mle_cache",
465 					  sizeof(struct dlm_master_list_entry),
466 					  0, SLAB_HWCACHE_ALIGN,
467 					  NULL, NULL);
468 	if (dlm_mle_cache == NULL)
469 		return -ENOMEM;
470 	return 0;
471 }
472 
473 void dlm_destroy_mle_cache(void)
474 {
475 	if (dlm_mle_cache)
476 		kmem_cache_destroy(dlm_mle_cache);
477 }
478 
479 static void dlm_mle_release(struct kref *kref)
480 {
481 	struct dlm_master_list_entry *mle;
482 	struct dlm_ctxt *dlm;
483 
484 	mlog_entry_void();
485 
486 	mle = container_of(kref, struct dlm_master_list_entry, mle_refs);
487 	dlm = mle->dlm;
488 
489 	if (mle->type != DLM_MLE_MASTER) {
490 		mlog(0, "calling mle_release for %.*s, type %d\n",
491 		     mle->u.name.len, mle->u.name.name, mle->type);
492 	} else {
493 		mlog(0, "calling mle_release for %.*s, type %d\n",
494 		     mle->u.res->lockname.len,
495 		     mle->u.res->lockname.name, mle->type);
496 	}
497 	assert_spin_locked(&dlm->spinlock);
498 	assert_spin_locked(&dlm->master_lock);
499 
500 	/* remove from list if not already */
501 	if (!list_empty(&mle->list))
502 		list_del_init(&mle->list);
503 
504 	/* detach the mle from the domain node up/down events */
505 	__dlm_mle_detach_hb_events(dlm, mle);
506 
507 	/* NOTE: kfree under spinlock here.
508 	 * if this is bad, we can move this to a freelist. */
509 	kmem_cache_free(dlm_mle_cache, mle);
510 }
511 
512 
513 /*
514  * LOCK RESOURCE FUNCTIONS
515  */
516 
517 static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,
518 				  struct dlm_lock_resource *res,
519 				  u8 owner)
520 {
521 	assert_spin_locked(&res->spinlock);
522 
523 	mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);
524 
525 	if (owner == dlm->node_num)
526 		atomic_inc(&dlm->local_resources);
527 	else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)
528 		atomic_inc(&dlm->unknown_resources);
529 	else
530 		atomic_inc(&dlm->remote_resources);
531 
532 	res->owner = owner;
533 }
534 
535 void dlm_change_lockres_owner(struct dlm_ctxt *dlm,
536 			      struct dlm_lock_resource *res, u8 owner)
537 {
538 	assert_spin_locked(&res->spinlock);
539 
540 	if (owner == res->owner)
541 		return;
542 
543 	if (res->owner == dlm->node_num)
544 		atomic_dec(&dlm->local_resources);
545 	else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)
546 		atomic_dec(&dlm->unknown_resources);
547 	else
548 		atomic_dec(&dlm->remote_resources);
549 
550 	dlm_set_lockres_owner(dlm, res, owner);
551 }
552 
553 
554 static void dlm_lockres_release(struct kref *kref)
555 {
556 	struct dlm_lock_resource *res;
557 
558 	res = container_of(kref, struct dlm_lock_resource, refs);
559 
560 	/* This should not happen -- all lockres' have a name
561 	 * associated with them at init time. */
562 	BUG_ON(!res->lockname.name);
563 
564 	mlog(0, "destroying lockres %.*s\n", res->lockname.len,
565 	     res->lockname.name);
566 
567 	/* By the time we're ready to blow this guy away, we shouldn't
568 	 * be on any lists. */
569 	BUG_ON(!hlist_unhashed(&res->hash_node));
570 	BUG_ON(!list_empty(&res->granted));
571 	BUG_ON(!list_empty(&res->converting));
572 	BUG_ON(!list_empty(&res->blocked));
573 	BUG_ON(!list_empty(&res->dirty));
574 	BUG_ON(!list_empty(&res->recovering));
575 	BUG_ON(!list_empty(&res->purge));
576 
577 	kfree(res->lockname.name);
578 
579 	kfree(res);
580 }
581 
582 void dlm_lockres_get(struct dlm_lock_resource *res)
583 {
584 	kref_get(&res->refs);
585 }
586 
587 void dlm_lockres_put(struct dlm_lock_resource *res)
588 {
589 	kref_put(&res->refs, dlm_lockres_release);
590 }
591 
592 static void dlm_init_lockres(struct dlm_ctxt *dlm,
593 			     struct dlm_lock_resource *res,
594 			     const char *name, unsigned int namelen)
595 {
596 	char *qname;
597 
598 	/* If we memset here, we lose our reference to the kmalloc'd
599 	 * res->lockname.name, so be sure to init every field
600 	 * correctly! */
601 
602 	qname = (char *) res->lockname.name;
603 	memcpy(qname, name, namelen);
604 
605 	res->lockname.len = namelen;
606 	res->lockname.hash = full_name_hash(name, namelen);
607 
608 	init_waitqueue_head(&res->wq);
609 	spin_lock_init(&res->spinlock);
610 	INIT_HLIST_NODE(&res->hash_node);
611 	INIT_LIST_HEAD(&res->granted);
612 	INIT_LIST_HEAD(&res->converting);
613 	INIT_LIST_HEAD(&res->blocked);
614 	INIT_LIST_HEAD(&res->dirty);
615 	INIT_LIST_HEAD(&res->recovering);
616 	INIT_LIST_HEAD(&res->purge);
617 	atomic_set(&res->asts_reserved, 0);
618 	res->migration_pending = 0;
619 
620 	kref_init(&res->refs);
621 
622 	/* just for consistency */
623 	spin_lock(&res->spinlock);
624 	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);
625 	spin_unlock(&res->spinlock);
626 
627 	res->state = DLM_LOCK_RES_IN_PROGRESS;
628 
629 	res->last_used = 0;
630 
631 	memset(res->lvb, 0, DLM_LVB_LEN);
632 }
633 
634 struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,
635 				   const char *name,
636 				   unsigned int namelen)
637 {
638 	struct dlm_lock_resource *res;
639 
640 	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
641 	if (!res)
642 		return NULL;
643 
644 	res->lockname.name = kmalloc(namelen, GFP_KERNEL);
645 	if (!res->lockname.name) {
646 		kfree(res);
647 		return NULL;
648 	}
649 
650 	dlm_init_lockres(dlm, res, name, namelen);
651 	return res;
652 }
653 
654 /*
655  * lookup a lock resource by name.
656  * may already exist in the hashtable.
657  * lockid is null terminated
658  *
659  * if not, allocate enough for the lockres and for
660  * the temporary structure used in doing the mastering.
661  *
662  * also, do a lookup in the dlm->master_list to see
663  * if another node has begun mastering the same lock.
664  * if so, there should be a block entry in there
665  * for this name, and we should *not* attempt to master
666  * the lock here.   need to wait around for that node
667  * to assert_master (or die).
668  *
669  */
670 struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,
671 					  const char *lockid,
672 					  int flags)
673 {
674 	struct dlm_lock_resource *tmpres=NULL, *res=NULL;
675 	struct dlm_master_list_entry *mle = NULL;
676 	struct dlm_master_list_entry *alloc_mle = NULL;
677 	int blocked = 0;
678 	int ret, nodenum;
679 	struct dlm_node_iter iter;
680 	unsigned int namelen;
681 	int tries = 0;
682 	int bit, wait_on_recovery = 0;
683 
684 	BUG_ON(!lockid);
685 
686 	namelen = strlen(lockid);
687 
688 	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);
689 
690 lookup:
691 	spin_lock(&dlm->spinlock);
692 	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);
693 	if (tmpres) {
694 		spin_unlock(&dlm->spinlock);
695 		mlog(0, "found in hash!\n");
696 		if (res)
697 			dlm_lockres_put(res);
698 		res = tmpres;
699 		goto leave;
700 	}
701 
702 	if (!res) {
703 		spin_unlock(&dlm->spinlock);
704 		mlog(0, "allocating a new resource\n");
705 		/* nothing found and we need to allocate one. */
706 		alloc_mle = (struct dlm_master_list_entry *)
707 			kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
708 		if (!alloc_mle)
709 			goto leave;
710 		res = dlm_new_lockres(dlm, lockid, namelen);
711 		if (!res)
712 			goto leave;
713 		goto lookup;
714 	}
715 
716 	mlog(0, "no lockres found, allocated our own: %p\n", res);
717 
718 	if (flags & LKM_LOCAL) {
719 		/* caller knows it's safe to assume it's not mastered elsewhere
720 		 * DONE!  return right away */
721 		spin_lock(&res->spinlock);
722 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
723 		__dlm_insert_lockres(dlm, res);
724 		spin_unlock(&res->spinlock);
725 		spin_unlock(&dlm->spinlock);
726 		/* lockres still marked IN_PROGRESS */
727 		goto wake_waiters;
728 	}
729 
730 	/* check master list to see if another node has started mastering it */
731 	spin_lock(&dlm->master_lock);
732 
733 	/* if we found a block, wait for lock to be mastered by another node */
734 	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);
735 	if (blocked) {
736 		if (mle->type == DLM_MLE_MASTER) {
737 			mlog(ML_ERROR, "master entry for nonexistent lock!\n");
738 			BUG();
739 		} else if (mle->type == DLM_MLE_MIGRATION) {
740 			/* migration is in progress! */
741 			/* the good news is that we now know the
742 			 * "current" master (mle->master). */
743 
744 			spin_unlock(&dlm->master_lock);
745 			assert_spin_locked(&dlm->spinlock);
746 
747 			/* set the lockres owner and hash it */
748 			spin_lock(&res->spinlock);
749 			dlm_set_lockres_owner(dlm, res, mle->master);
750 			__dlm_insert_lockres(dlm, res);
751 			spin_unlock(&res->spinlock);
752 			spin_unlock(&dlm->spinlock);
753 
754 			/* master is known, detach */
755 			dlm_mle_detach_hb_events(dlm, mle);
756 			dlm_put_mle(mle);
757 			mle = NULL;
758 			goto wake_waiters;
759 		}
760 	} else {
761 		/* go ahead and try to master lock on this node */
762 		mle = alloc_mle;
763 		/* make sure this does not get freed below */
764 		alloc_mle = NULL;
765 		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);
766 		set_bit(dlm->node_num, mle->maybe_map);
767 		list_add(&mle->list, &dlm->master_list);
768 
769 		/* still holding the dlm spinlock, check the recovery map
770 		 * to see if there are any nodes that still need to be
771 		 * considered.  these will not appear in the mle nodemap
772 		 * but they might own this lockres.  wait on them. */
773 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
774 		if (bit < O2NM_MAX_NODES) {
775 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
776 			     "recover before lock mastery can begin\n",
777 			     dlm->name, namelen, (char *)lockid, bit);
778 			wait_on_recovery = 1;
779 		}
780 	}
781 
782 	/* at this point there is either a DLM_MLE_BLOCK or a
783 	 * DLM_MLE_MASTER on the master list, so it's safe to add the
784 	 * lockres to the hashtable.  anyone who finds the lock will
785 	 * still have to wait on the IN_PROGRESS. */
786 
787 	/* finally add the lockres to its hash bucket */
788 	__dlm_insert_lockres(dlm, res);
789 	/* get an extra ref on the mle in case this is a BLOCK
790 	 * if so, the creator of the BLOCK may try to put the last
791 	 * ref at this time in the assert master handler, so we
792 	 * need an extra one to keep from a bad ptr deref. */
793 	dlm_get_mle(mle);
794 	spin_unlock(&dlm->master_lock);
795 	spin_unlock(&dlm->spinlock);
796 
797 	while (wait_on_recovery) {
798 		/* any cluster changes that occurred after dropping the
799 		 * dlm spinlock would be detectable be a change on the mle,
800 		 * so we only need to clear out the recovery map once. */
801 		if (dlm_is_recovery_lock(lockid, namelen)) {
802 			mlog(ML_NOTICE, "%s: recovery map is not empty, but "
803 			     "must master $RECOVERY lock now\n", dlm->name);
804 			if (!dlm_pre_master_reco_lockres(dlm, res))
805 				wait_on_recovery = 0;
806 			else {
807 				mlog(0, "%s: waiting 500ms for heartbeat state "
808 				    "change\n", dlm->name);
809 				msleep(500);
810 			}
811 			continue;
812 		}
813 
814 		dlm_kick_recovery_thread(dlm);
815 		msleep(100);
816 		dlm_wait_for_recovery(dlm);
817 
818 		spin_lock(&dlm->spinlock);
819 		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);
820 		if (bit < O2NM_MAX_NODES) {
821 			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to"
822 			     "recover before lock mastery can begin\n",
823 			     dlm->name, namelen, (char *)lockid, bit);
824 			wait_on_recovery = 1;
825 		} else
826 			wait_on_recovery = 0;
827 		spin_unlock(&dlm->spinlock);
828 	}
829 
830 	/* must wait for lock to be mastered elsewhere */
831 	if (blocked)
832 		goto wait;
833 
834 redo_request:
835 	ret = -EINVAL;
836 	dlm_node_iter_init(mle->vote_map, &iter);
837 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
838 		ret = dlm_do_master_request(mle, nodenum);
839 		if (ret < 0)
840 			mlog_errno(ret);
841 		if (mle->master != O2NM_MAX_NODES) {
842 			/* found a master ! */
843 			if (mle->master <= nodenum)
844 				break;
845 			/* if our master request has not reached the master
846 			 * yet, keep going until it does.  this is how the
847 			 * master will know that asserts are needed back to
848 			 * the lower nodes. */
849 			mlog(0, "%s:%.*s: requests only up to %u but master "
850 			     "is %u, keep going\n", dlm->name, namelen,
851 			     lockid, nodenum, mle->master);
852 		}
853 	}
854 
855 wait:
856 	/* keep going until the response map includes all nodes */
857 	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
858 	if (ret < 0) {
859 		mlog(0, "%s:%.*s: node map changed, redo the "
860 		     "master request now, blocked=%d\n",
861 		     dlm->name, res->lockname.len,
862 		     res->lockname.name, blocked);
863 		if (++tries > 20) {
864 			mlog(ML_ERROR, "%s:%.*s: spinning on "
865 			     "dlm_wait_for_lock_mastery, blocked=%d\n",
866 			     dlm->name, res->lockname.len,
867 			     res->lockname.name, blocked);
868 			dlm_print_one_lock_resource(res);
869 			/* dlm_print_one_mle(mle); */
870 			tries = 0;
871 		}
872 		goto redo_request;
873 	}
874 
875 	mlog(0, "lockres mastered by %u\n", res->owner);
876 	/* make sure we never continue without this */
877 	BUG_ON(res->owner == O2NM_MAX_NODES);
878 
879 	/* master is known, detach if not already detached */
880 	dlm_mle_detach_hb_events(dlm, mle);
881 	dlm_put_mle(mle);
882 	/* put the extra ref */
883 	dlm_put_mle(mle);
884 
885 wake_waiters:
886 	spin_lock(&res->spinlock);
887 	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;
888 	spin_unlock(&res->spinlock);
889 	wake_up(&res->wq);
890 
891 leave:
892 	/* need to free the unused mle */
893 	if (alloc_mle)
894 		kmem_cache_free(dlm_mle_cache, alloc_mle);
895 
896 	return res;
897 }
898 
899 
900 #define DLM_MASTERY_TIMEOUT_MS   5000
901 
902 static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,
903 				     struct dlm_lock_resource *res,
904 				     struct dlm_master_list_entry *mle,
905 				     int *blocked)
906 {
907 	u8 m;
908 	int ret, bit;
909 	int map_changed, voting_done;
910 	int assert, sleep;
911 
912 recheck:
913 	ret = 0;
914 	assert = 0;
915 
916 	/* check if another node has already become the owner */
917 	spin_lock(&res->spinlock);
918 	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
919 		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,
920 		     res->lockname.len, res->lockname.name, res->owner);
921 		spin_unlock(&res->spinlock);
922 		/* this will cause the master to re-assert across
923 		 * the whole cluster, freeing up mles */
924 		ret = dlm_do_master_request(mle, res->owner);
925 		if (ret < 0) {
926 			/* give recovery a chance to run */
927 			mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);
928 			msleep(500);
929 			goto recheck;
930 		}
931 		ret = 0;
932 		goto leave;
933 	}
934 	spin_unlock(&res->spinlock);
935 
936 	spin_lock(&mle->spinlock);
937 	m = mle->master;
938 	map_changed = (memcmp(mle->vote_map, mle->node_map,
939 			      sizeof(mle->vote_map)) != 0);
940 	voting_done = (memcmp(mle->vote_map, mle->response_map,
941 			     sizeof(mle->vote_map)) == 0);
942 
943 	/* restart if we hit any errors */
944 	if (map_changed) {
945 		int b;
946 		mlog(0, "%s: %.*s: node map changed, restarting\n",
947 		     dlm->name, res->lockname.len, res->lockname.name);
948 		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);
949 		b = (mle->type == DLM_MLE_BLOCK);
950 		if ((*blocked && !b) || (!*blocked && b)) {
951 			mlog(0, "%s:%.*s: status change: old=%d new=%d\n",
952 			     dlm->name, res->lockname.len, res->lockname.name,
953 			     *blocked, b);
954 			*blocked = b;
955 		}
956 		spin_unlock(&mle->spinlock);
957 		if (ret < 0) {
958 			mlog_errno(ret);
959 			goto leave;
960 		}
961 		mlog(0, "%s:%.*s: restart lock mastery succeeded, "
962 		     "rechecking now\n", dlm->name, res->lockname.len,
963 		     res->lockname.name);
964 		goto recheck;
965 	}
966 
967 	if (m != O2NM_MAX_NODES) {
968 		/* another node has done an assert!
969 		 * all done! */
970 		sleep = 0;
971 	} else {
972 		sleep = 1;
973 		/* have all nodes responded? */
974 		if (voting_done && !*blocked) {
975 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
976 			if (dlm->node_num <= bit) {
977 				/* my node number is lowest.
978 			 	 * now tell other nodes that I am
979 				 * mastering this. */
980 				mle->master = dlm->node_num;
981 				assert = 1;
982 				sleep = 0;
983 			}
984 			/* if voting is done, but we have not received
985 			 * an assert master yet, we must sleep */
986 		}
987 	}
988 
989 	spin_unlock(&mle->spinlock);
990 
991 	/* sleep if we haven't finished voting yet */
992 	if (sleep) {
993 		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);
994 
995 		/*
996 		if (atomic_read(&mle->mle_refs.refcount) < 2)
997 			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,
998 			atomic_read(&mle->mle_refs.refcount),
999 			res->lockname.len, res->lockname.name);
1000 		*/
1001 		atomic_set(&mle->woken, 0);
1002 		(void)wait_event_timeout(mle->wq,
1003 					 (atomic_read(&mle->woken) == 1),
1004 					 timeo);
1005 		if (res->owner == O2NM_MAX_NODES) {
1006 			mlog(0, "waiting again\n");
1007 			goto recheck;
1008 		}
1009 		mlog(0, "done waiting, master is %u\n", res->owner);
1010 		ret = 0;
1011 		goto leave;
1012 	}
1013 
1014 	ret = 0;   /* done */
1015 	if (assert) {
1016 		m = dlm->node_num;
1017 		mlog(0, "about to master %.*s here, this=%u\n",
1018 		     res->lockname.len, res->lockname.name, m);
1019 		ret = dlm_do_assert_master(dlm, res->lockname.name,
1020 					   res->lockname.len, mle->vote_map, 0);
1021 		if (ret) {
1022 			/* This is a failure in the network path,
1023 			 * not in the response to the assert_master
1024 			 * (any nonzero response is a BUG on this node).
1025 			 * Most likely a socket just got disconnected
1026 			 * due to node death. */
1027 			mlog_errno(ret);
1028 		}
1029 		/* no longer need to restart lock mastery.
1030 		 * all living nodes have been contacted. */
1031 		ret = 0;
1032 	}
1033 
1034 	/* set the lockres owner */
1035 	spin_lock(&res->spinlock);
1036 	dlm_change_lockres_owner(dlm, res, m);
1037 	spin_unlock(&res->spinlock);
1038 
1039 leave:
1040 	return ret;
1041 }
1042 
1043 struct dlm_bitmap_diff_iter
1044 {
1045 	int curnode;
1046 	unsigned long *orig_bm;
1047 	unsigned long *cur_bm;
1048 	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];
1049 };
1050 
1051 enum dlm_node_state_change
1052 {
1053 	NODE_DOWN = -1,
1054 	NODE_NO_CHANGE = 0,
1055 	NODE_UP
1056 };
1057 
1058 static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,
1059 				      unsigned long *orig_bm,
1060 				      unsigned long *cur_bm)
1061 {
1062 	unsigned long p1, p2;
1063 	int i;
1064 
1065 	iter->curnode = -1;
1066 	iter->orig_bm = orig_bm;
1067 	iter->cur_bm = cur_bm;
1068 
1069 	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {
1070        		p1 = *(iter->orig_bm + i);
1071 	       	p2 = *(iter->cur_bm + i);
1072 		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);
1073 	}
1074 }
1075 
1076 static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,
1077 				     enum dlm_node_state_change *state)
1078 {
1079 	int bit;
1080 
1081 	if (iter->curnode >= O2NM_MAX_NODES)
1082 		return -ENOENT;
1083 
1084 	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,
1085 			    iter->curnode+1);
1086 	if (bit >= O2NM_MAX_NODES) {
1087 		iter->curnode = O2NM_MAX_NODES;
1088 		return -ENOENT;
1089 	}
1090 
1091 	/* if it was there in the original then this node died */
1092 	if (test_bit(bit, iter->orig_bm))
1093 		*state = NODE_DOWN;
1094 	else
1095 		*state = NODE_UP;
1096 
1097 	iter->curnode = bit;
1098 	return bit;
1099 }
1100 
1101 
1102 static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
1103 				    struct dlm_lock_resource *res,
1104 				    struct dlm_master_list_entry *mle,
1105 				    int blocked)
1106 {
1107 	struct dlm_bitmap_diff_iter bdi;
1108 	enum dlm_node_state_change sc;
1109 	int node;
1110 	int ret = 0;
1111 
1112 	mlog(0, "something happened such that the "
1113 	     "master process may need to be restarted!\n");
1114 
1115 	assert_spin_locked(&mle->spinlock);
1116 
1117 	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);
1118 	node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1119 	while (node >= 0) {
1120 		if (sc == NODE_UP) {
1121 			/* a node came up.  clear any old vote from
1122 			 * the response map and set it in the vote map
1123 			 * then restart the mastery. */
1124 			mlog(ML_NOTICE, "node %d up while restarting\n", node);
1125 
1126 			/* redo the master request, but only for the new node */
1127 			mlog(0, "sending request to new node\n");
1128 			clear_bit(node, mle->response_map);
1129 			set_bit(node, mle->vote_map);
1130 		} else {
1131 			mlog(ML_ERROR, "node down! %d\n", node);
1132 
1133 			/* if the node wasn't involved in mastery skip it,
1134 			 * but clear it out from the maps so that it will
1135 			 * not affect mastery of this lockres */
1136 			clear_bit(node, mle->response_map);
1137 			clear_bit(node, mle->vote_map);
1138 			if (!test_bit(node, mle->maybe_map))
1139 				goto next;
1140 
1141 			/* if we're already blocked on lock mastery, and the
1142 			 * dead node wasn't the expected master, or there is
1143 			 * another node in the maybe_map, keep waiting */
1144 			if (blocked) {
1145 				int lowest = find_next_bit(mle->maybe_map,
1146 						       O2NM_MAX_NODES, 0);
1147 
1148 				/* act like it was never there */
1149 				clear_bit(node, mle->maybe_map);
1150 
1151 			       	if (node != lowest)
1152 					goto next;
1153 
1154 				mlog(ML_ERROR, "expected master %u died while "
1155 				     "this node was blocked waiting on it!\n",
1156 				     node);
1157 				lowest = find_next_bit(mle->maybe_map,
1158 						       O2NM_MAX_NODES,
1159 						       lowest+1);
1160 				if (lowest < O2NM_MAX_NODES) {
1161 					mlog(0, "still blocked. waiting "
1162 					     "on %u now\n", lowest);
1163 					goto next;
1164 				}
1165 
1166 				/* mle is an MLE_BLOCK, but there is now
1167 				 * nothing left to block on.  we need to return
1168 				 * all the way back out and try again with
1169 				 * an MLE_MASTER. dlm_do_local_recovery_cleanup
1170 				 * has already run, so the mle refcount is ok */
1171 				mlog(0, "no longer blocking. we can "
1172 				     "try to master this here\n");
1173 				mle->type = DLM_MLE_MASTER;
1174 				memset(mle->maybe_map, 0,
1175 				       sizeof(mle->maybe_map));
1176 				memset(mle->response_map, 0,
1177 				       sizeof(mle->maybe_map));
1178 				memcpy(mle->vote_map, mle->node_map,
1179 				       sizeof(mle->node_map));
1180 				mle->u.res = res;
1181 				set_bit(dlm->node_num, mle->maybe_map);
1182 
1183 				ret = -EAGAIN;
1184 				goto next;
1185 			}
1186 
1187 			clear_bit(node, mle->maybe_map);
1188 			if (node > dlm->node_num)
1189 				goto next;
1190 
1191 			mlog(0, "dead node in map!\n");
1192 			/* yuck. go back and re-contact all nodes
1193 			 * in the vote_map, removing this node. */
1194 			memset(mle->response_map, 0,
1195 			       sizeof(mle->response_map));
1196 		}
1197 		ret = -EAGAIN;
1198 next:
1199 		node = dlm_bitmap_diff_iter_next(&bdi, &sc);
1200 	}
1201 	return ret;
1202 }
1203 
1204 
1205 /*
1206  * DLM_MASTER_REQUEST_MSG
1207  *
1208  * returns: 0 on success,
1209  *          -errno on a network error
1210  *
1211  * on error, the caller should assume the target node is "dead"
1212  *
1213  */
1214 
1215 static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to)
1216 {
1217 	struct dlm_ctxt *dlm = mle->dlm;
1218 	struct dlm_master_request request;
1219 	int ret, response=0, resend;
1220 
1221 	memset(&request, 0, sizeof(request));
1222 	request.node_idx = dlm->node_num;
1223 
1224 	BUG_ON(mle->type == DLM_MLE_MIGRATION);
1225 
1226 	if (mle->type != DLM_MLE_MASTER) {
1227 		request.namelen = mle->u.name.len;
1228 		memcpy(request.name, mle->u.name.name, request.namelen);
1229 	} else {
1230 		request.namelen = mle->u.res->lockname.len;
1231 		memcpy(request.name, mle->u.res->lockname.name,
1232 			request.namelen);
1233 	}
1234 
1235 again:
1236 	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,
1237 				 sizeof(request), to, &response);
1238 	if (ret < 0)  {
1239 		if (ret == -ESRCH) {
1240 			/* should never happen */
1241 			mlog(ML_ERROR, "TCP stack not ready!\n");
1242 			BUG();
1243 		} else if (ret == -EINVAL) {
1244 			mlog(ML_ERROR, "bad args passed to o2net!\n");
1245 			BUG();
1246 		} else if (ret == -ENOMEM) {
1247 			mlog(ML_ERROR, "out of memory while trying to send "
1248 			     "network message!  retrying\n");
1249 			/* this is totally crude */
1250 			msleep(50);
1251 			goto again;
1252 		} else if (!dlm_is_host_down(ret)) {
1253 			/* not a network error. bad. */
1254 			mlog_errno(ret);
1255 			mlog(ML_ERROR, "unhandled error!");
1256 			BUG();
1257 		}
1258 		/* all other errors should be network errors,
1259 		 * and likely indicate node death */
1260 		mlog(ML_ERROR, "link to %d went down!\n", to);
1261 		goto out;
1262 	}
1263 
1264 	ret = 0;
1265 	resend = 0;
1266 	spin_lock(&mle->spinlock);
1267 	switch (response) {
1268 		case DLM_MASTER_RESP_YES:
1269 			set_bit(to, mle->response_map);
1270 			mlog(0, "node %u is the master, response=YES\n", to);
1271 			mle->master = to;
1272 			break;
1273 		case DLM_MASTER_RESP_NO:
1274 			mlog(0, "node %u not master, response=NO\n", to);
1275 			set_bit(to, mle->response_map);
1276 			break;
1277 		case DLM_MASTER_RESP_MAYBE:
1278 			mlog(0, "node %u not master, response=MAYBE\n", to);
1279 			set_bit(to, mle->response_map);
1280 			set_bit(to, mle->maybe_map);
1281 			break;
1282 		case DLM_MASTER_RESP_ERROR:
1283 			mlog(0, "node %u hit an error, resending\n", to);
1284 			resend = 1;
1285 			response = 0;
1286 			break;
1287 		default:
1288 			mlog(ML_ERROR, "bad response! %u\n", response);
1289 			BUG();
1290 	}
1291 	spin_unlock(&mle->spinlock);
1292 	if (resend) {
1293 		/* this is also totally crude */
1294 		msleep(50);
1295 		goto again;
1296 	}
1297 
1298 out:
1299 	return ret;
1300 }
1301 
1302 /*
1303  * locks that can be taken here:
1304  * dlm->spinlock
1305  * res->spinlock
1306  * mle->spinlock
1307  * dlm->master_list
1308  *
1309  * if possible, TRIM THIS DOWN!!!
1310  */
1311 int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data)
1312 {
1313 	u8 response = DLM_MASTER_RESP_MAYBE;
1314 	struct dlm_ctxt *dlm = data;
1315 	struct dlm_lock_resource *res = NULL;
1316 	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;
1317 	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;
1318 	char *name;
1319 	unsigned int namelen;
1320 	int found, ret;
1321 	int set_maybe;
1322 	int dispatch_assert = 0;
1323 
1324 	if (!dlm_grab(dlm))
1325 		return DLM_MASTER_RESP_NO;
1326 
1327 	if (!dlm_domain_fully_joined(dlm)) {
1328 		response = DLM_MASTER_RESP_NO;
1329 		goto send_response;
1330 	}
1331 
1332 	name = request->name;
1333 	namelen = request->namelen;
1334 
1335 	if (namelen > DLM_LOCKID_NAME_MAX) {
1336 		response = DLM_IVBUFLEN;
1337 		goto send_response;
1338 	}
1339 
1340 way_up_top:
1341 	spin_lock(&dlm->spinlock);
1342 	res = __dlm_lookup_lockres(dlm, name, namelen);
1343 	if (res) {
1344 		spin_unlock(&dlm->spinlock);
1345 
1346 		/* take care of the easy cases up front */
1347 		spin_lock(&res->spinlock);
1348 		if (res->state & DLM_LOCK_RES_RECOVERING) {
1349 			spin_unlock(&res->spinlock);
1350 			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "
1351 			     "being recovered\n");
1352 			response = DLM_MASTER_RESP_ERROR;
1353 			if (mle)
1354 				kmem_cache_free(dlm_mle_cache, mle);
1355 			goto send_response;
1356 		}
1357 
1358 		if (res->owner == dlm->node_num) {
1359 			spin_unlock(&res->spinlock);
1360 			// mlog(0, "this node is the master\n");
1361 			response = DLM_MASTER_RESP_YES;
1362 			if (mle)
1363 				kmem_cache_free(dlm_mle_cache, mle);
1364 
1365 			/* this node is the owner.
1366 			 * there is some extra work that needs to
1367 			 * happen now.  the requesting node has
1368 			 * caused all nodes up to this one to
1369 			 * create mles.  this node now needs to
1370 			 * go back and clean those up. */
1371 			dispatch_assert = 1;
1372 			goto send_response;
1373 		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1374 			spin_unlock(&res->spinlock);
1375 			// mlog(0, "node %u is the master\n", res->owner);
1376 			response = DLM_MASTER_RESP_NO;
1377 			if (mle)
1378 				kmem_cache_free(dlm_mle_cache, mle);
1379 			goto send_response;
1380 		}
1381 
1382 		/* ok, there is no owner.  either this node is
1383 		 * being blocked, or it is actively trying to
1384 		 * master this lock. */
1385 		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1386 			mlog(ML_ERROR, "lock with no owner should be "
1387 			     "in-progress!\n");
1388 			BUG();
1389 		}
1390 
1391 		// mlog(0, "lockres is in progress...\n");
1392 		spin_lock(&dlm->master_lock);
1393 		found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1394 		if (!found) {
1395 			mlog(ML_ERROR, "no mle found for this lock!\n");
1396 			BUG();
1397 		}
1398 		set_maybe = 1;
1399 		spin_lock(&tmpmle->spinlock);
1400 		if (tmpmle->type == DLM_MLE_BLOCK) {
1401 			// mlog(0, "this node is waiting for "
1402 			// "lockres to be mastered\n");
1403 			response = DLM_MASTER_RESP_NO;
1404 		} else if (tmpmle->type == DLM_MLE_MIGRATION) {
1405 			mlog(0, "node %u is master, but trying to migrate to "
1406 			     "node %u.\n", tmpmle->master, tmpmle->new_master);
1407 			if (tmpmle->master == dlm->node_num) {
1408 				response = DLM_MASTER_RESP_YES;
1409 				mlog(ML_ERROR, "no owner on lockres, but this "
1410 				     "node is trying to migrate it to %u?!\n",
1411 				     tmpmle->new_master);
1412 				BUG();
1413 			} else {
1414 				/* the real master can respond on its own */
1415 				response = DLM_MASTER_RESP_NO;
1416 			}
1417 		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1418 			set_maybe = 0;
1419 			if (tmpmle->master == dlm->node_num) {
1420 				response = DLM_MASTER_RESP_YES;
1421 				/* this node will be the owner.
1422 				 * go back and clean the mles on any
1423 				 * other nodes */
1424 				dispatch_assert = 1;
1425 			} else
1426 				response = DLM_MASTER_RESP_NO;
1427 		} else {
1428 			// mlog(0, "this node is attempting to "
1429 			// "master lockres\n");
1430 			response = DLM_MASTER_RESP_MAYBE;
1431 		}
1432 		if (set_maybe)
1433 			set_bit(request->node_idx, tmpmle->maybe_map);
1434 		spin_unlock(&tmpmle->spinlock);
1435 
1436 		spin_unlock(&dlm->master_lock);
1437 		spin_unlock(&res->spinlock);
1438 
1439 		/* keep the mle attached to heartbeat events */
1440 		dlm_put_mle(tmpmle);
1441 		if (mle)
1442 			kmem_cache_free(dlm_mle_cache, mle);
1443 		goto send_response;
1444 	}
1445 
1446 	/*
1447 	 * lockres doesn't exist on this node
1448 	 * if there is an MLE_BLOCK, return NO
1449 	 * if there is an MLE_MASTER, return MAYBE
1450 	 * otherwise, add an MLE_BLOCK, return NO
1451 	 */
1452 	spin_lock(&dlm->master_lock);
1453 	found = dlm_find_mle(dlm, &tmpmle, name, namelen);
1454 	if (!found) {
1455 		/* this lockid has never been seen on this node yet */
1456 		// mlog(0, "no mle found\n");
1457 		if (!mle) {
1458 			spin_unlock(&dlm->master_lock);
1459 			spin_unlock(&dlm->spinlock);
1460 
1461 			mle = (struct dlm_master_list_entry *)
1462 				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);
1463 			if (!mle) {
1464 				response = DLM_MASTER_RESP_ERROR;
1465 				mlog_errno(-ENOMEM);
1466 				goto send_response;
1467 			}
1468 			spin_lock(&dlm->spinlock);
1469 			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,
1470 					 name, namelen);
1471 			spin_unlock(&dlm->spinlock);
1472 			goto way_up_top;
1473 		}
1474 
1475 		// mlog(0, "this is second time thru, already allocated, "
1476 		// "add the block.\n");
1477 		set_bit(request->node_idx, mle->maybe_map);
1478 		list_add(&mle->list, &dlm->master_list);
1479 		response = DLM_MASTER_RESP_NO;
1480 	} else {
1481 		// mlog(0, "mle was found\n");
1482 		set_maybe = 1;
1483 		spin_lock(&tmpmle->spinlock);
1484 		if (tmpmle->master == dlm->node_num) {
1485 			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");
1486 			BUG();
1487 		}
1488 		if (tmpmle->type == DLM_MLE_BLOCK)
1489 			response = DLM_MASTER_RESP_NO;
1490 		else if (tmpmle->type == DLM_MLE_MIGRATION) {
1491 			mlog(0, "migration mle was found (%u->%u)\n",
1492 			     tmpmle->master, tmpmle->new_master);
1493 			/* real master can respond on its own */
1494 			response = DLM_MASTER_RESP_NO;
1495 		} else
1496 			response = DLM_MASTER_RESP_MAYBE;
1497 		if (set_maybe)
1498 			set_bit(request->node_idx, tmpmle->maybe_map);
1499 		spin_unlock(&tmpmle->spinlock);
1500 	}
1501 	spin_unlock(&dlm->master_lock);
1502 	spin_unlock(&dlm->spinlock);
1503 
1504 	if (found) {
1505 		/* keep the mle attached to heartbeat events */
1506 		dlm_put_mle(tmpmle);
1507 	}
1508 send_response:
1509 
1510 	if (dispatch_assert) {
1511 		if (response != DLM_MASTER_RESP_YES)
1512 			mlog(ML_ERROR, "invalid response %d\n", response);
1513 		if (!res) {
1514 			mlog(ML_ERROR, "bad lockres while trying to assert!\n");
1515 			BUG();
1516 		}
1517 		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
1518 			     dlm->node_num, res->lockname.len, res->lockname.name);
1519 		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
1520 						 DLM_ASSERT_MASTER_MLE_CLEANUP);
1521 		if (ret < 0) {
1522 			mlog(ML_ERROR, "failed to dispatch assert master work\n");
1523 			response = DLM_MASTER_RESP_ERROR;
1524 		}
1525 	}
1526 
1527 	dlm_put(dlm);
1528 	return response;
1529 }
1530 
1531 /*
1532  * DLM_ASSERT_MASTER_MSG
1533  */
1534 
1535 
1536 /*
1537  * NOTE: this can be used for debugging
1538  * can periodically run all locks owned by this node
1539  * and re-assert across the cluster...
1540  */
1541 static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,
1542 				unsigned int namelen, void *nodemap,
1543 				u32 flags)
1544 {
1545 	struct dlm_assert_master assert;
1546 	int to, tmpret;
1547 	struct dlm_node_iter iter;
1548 	int ret = 0;
1549 	int reassert;
1550 
1551 	BUG_ON(namelen > O2NM_MAX_NAME_LEN);
1552 again:
1553 	reassert = 0;
1554 
1555 	/* note that if this nodemap is empty, it returns 0 */
1556 	dlm_node_iter_init(nodemap, &iter);
1557 	while ((to = dlm_node_iter_next(&iter)) >= 0) {
1558 		int r = 0;
1559 		mlog(0, "sending assert master to %d (%.*s)\n", to,
1560 		     namelen, lockname);
1561 		memset(&assert, 0, sizeof(assert));
1562 		assert.node_idx = dlm->node_num;
1563 		assert.namelen = namelen;
1564 		memcpy(assert.name, lockname, namelen);
1565 		assert.flags = cpu_to_be32(flags);
1566 
1567 		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,
1568 					    &assert, sizeof(assert), to, &r);
1569 		if (tmpret < 0) {
1570 			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);
1571 			if (!dlm_is_host_down(tmpret)) {
1572 				mlog(ML_ERROR, "unhandled error!\n");
1573 				BUG();
1574 			}
1575 			/* a node died.  finish out the rest of the nodes. */
1576 			mlog(ML_ERROR, "link to %d went down!\n", to);
1577 			/* any nonzero status return will do */
1578 			ret = tmpret;
1579 		} else if (r < 0) {
1580 			/* ok, something horribly messed.  kill thyself. */
1581 			mlog(ML_ERROR,"during assert master of %.*s to %u, "
1582 			     "got %d.\n", namelen, lockname, to, r);
1583 			dlm_dump_lock_resources(dlm);
1584 			BUG();
1585 		} else if (r == EAGAIN) {
1586 			mlog(0, "%.*s: node %u create mles on other "
1587 			     "nodes and requests a re-assert\n",
1588 			     namelen, lockname, to);
1589 			reassert = 1;
1590 		}
1591 	}
1592 
1593 	if (reassert)
1594 		goto again;
1595 
1596 	return ret;
1597 }
1598 
1599 /*
1600  * locks that can be taken here:
1601  * dlm->spinlock
1602  * res->spinlock
1603  * mle->spinlock
1604  * dlm->master_list
1605  *
1606  * if possible, TRIM THIS DOWN!!!
1607  */
1608 int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data)
1609 {
1610 	struct dlm_ctxt *dlm = data;
1611 	struct dlm_master_list_entry *mle = NULL;
1612 	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;
1613 	struct dlm_lock_resource *res = NULL;
1614 	char *name;
1615 	unsigned int namelen;
1616 	u32 flags;
1617 	int master_request = 0;
1618 	int ret = 0;
1619 
1620 	if (!dlm_grab(dlm))
1621 		return 0;
1622 
1623 	name = assert->name;
1624 	namelen = assert->namelen;
1625 	flags = be32_to_cpu(assert->flags);
1626 
1627 	if (namelen > DLM_LOCKID_NAME_MAX) {
1628 		mlog(ML_ERROR, "Invalid name length!");
1629 		goto done;
1630 	}
1631 
1632 	spin_lock(&dlm->spinlock);
1633 
1634 	if (flags)
1635 		mlog(0, "assert_master with flags: %u\n", flags);
1636 
1637 	/* find the MLE */
1638 	spin_lock(&dlm->master_lock);
1639 	if (!dlm_find_mle(dlm, &mle, name, namelen)) {
1640 		/* not an error, could be master just re-asserting */
1641 		mlog(0, "just got an assert_master from %u, but no "
1642 		     "MLE for it! (%.*s)\n", assert->node_idx,
1643 		     namelen, name);
1644 	} else {
1645 		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);
1646 		if (bit >= O2NM_MAX_NODES) {
1647 			/* not necessarily an error, though less likely.
1648 			 * could be master just re-asserting. */
1649 			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "
1650 			     "is asserting! (%.*s)\n", assert->node_idx,
1651 			     namelen, name);
1652 		} else if (bit != assert->node_idx) {
1653 			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {
1654 				mlog(0, "master %u was found, %u should "
1655 				     "back off\n", assert->node_idx, bit);
1656 			} else {
1657 				/* with the fix for bug 569, a higher node
1658 				 * number winning the mastery will respond
1659 				 * YES to mastery requests, but this node
1660 				 * had no way of knowing.  let it pass. */
1661 				mlog(ML_ERROR, "%u is the lowest node, "
1662 				     "%u is asserting. (%.*s)  %u must "
1663 				     "have begun after %u won.\n", bit,
1664 				     assert->node_idx, namelen, name, bit,
1665 				     assert->node_idx);
1666 			}
1667 		}
1668 	}
1669 	spin_unlock(&dlm->master_lock);
1670 
1671 	/* ok everything checks out with the MLE
1672 	 * now check to see if there is a lockres */
1673 	res = __dlm_lookup_lockres(dlm, name, namelen);
1674 	if (res) {
1675 		spin_lock(&res->spinlock);
1676 		if (res->state & DLM_LOCK_RES_RECOVERING)  {
1677 			mlog(ML_ERROR, "%u asserting but %.*s is "
1678 			     "RECOVERING!\n", assert->node_idx, namelen, name);
1679 			goto kill;
1680 		}
1681 		if (!mle) {
1682 			if (res->owner != assert->node_idx) {
1683 				mlog(ML_ERROR, "assert_master from "
1684 					  "%u, but current owner is "
1685 					  "%u! (%.*s)\n",
1686 				       assert->node_idx, res->owner,
1687 				       namelen, name);
1688 				goto kill;
1689 			}
1690 		} else if (mle->type != DLM_MLE_MIGRATION) {
1691 			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {
1692 				/* owner is just re-asserting */
1693 				if (res->owner == assert->node_idx) {
1694 					mlog(0, "owner %u re-asserting on "
1695 					     "lock %.*s\n", assert->node_idx,
1696 					     namelen, name);
1697 					goto ok;
1698 				}
1699 				mlog(ML_ERROR, "got assert_master from "
1700 				     "node %u, but %u is the owner! "
1701 				     "(%.*s)\n", assert->node_idx,
1702 				     res->owner, namelen, name);
1703 				goto kill;
1704 			}
1705 			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {
1706 				mlog(ML_ERROR, "got assert from %u, but lock "
1707 				     "with no owner should be "
1708 				     "in-progress! (%.*s)\n",
1709 				     assert->node_idx,
1710 				     namelen, name);
1711 				goto kill;
1712 			}
1713 		} else /* mle->type == DLM_MLE_MIGRATION */ {
1714 			/* should only be getting an assert from new master */
1715 			if (assert->node_idx != mle->new_master) {
1716 				mlog(ML_ERROR, "got assert from %u, but "
1717 				     "new master is %u, and old master "
1718 				     "was %u (%.*s)\n",
1719 				     assert->node_idx, mle->new_master,
1720 				     mle->master, namelen, name);
1721 				goto kill;
1722 			}
1723 
1724 		}
1725 ok:
1726 		spin_unlock(&res->spinlock);
1727 	}
1728 	spin_unlock(&dlm->spinlock);
1729 
1730 	// mlog(0, "woo!  got an assert_master from node %u!\n",
1731 	// 	     assert->node_idx);
1732 	if (mle) {
1733 		int extra_ref = 0;
1734 		int nn = -1;
1735 
1736 		spin_lock(&mle->spinlock);
1737 		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)
1738 			extra_ref = 1;
1739 		else {
1740 			/* MASTER mle: if any bits set in the response map
1741 			 * then the calling node needs to re-assert to clear
1742 			 * up nodes that this node contacted */
1743 			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES,
1744 						    nn+1)) < O2NM_MAX_NODES) {
1745 				if (nn != dlm->node_num && nn != assert->node_idx)
1746 					master_request = 1;
1747 			}
1748 		}
1749 		mle->master = assert->node_idx;
1750 		atomic_set(&mle->woken, 1);
1751 		wake_up(&mle->wq);
1752 		spin_unlock(&mle->spinlock);
1753 
1754 		if (mle->type == DLM_MLE_MIGRATION && res) {
1755 			mlog(0, "finishing off migration of lockres %.*s, "
1756 			     "from %u to %u\n",
1757 			       res->lockname.len, res->lockname.name,
1758 			       dlm->node_num, mle->new_master);
1759 			spin_lock(&res->spinlock);
1760 			res->state &= ~DLM_LOCK_RES_MIGRATING;
1761 			dlm_change_lockres_owner(dlm, res, mle->new_master);
1762 			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);
1763 			spin_unlock(&res->spinlock);
1764 		}
1765 		/* master is known, detach if not already detached */
1766 		dlm_mle_detach_hb_events(dlm, mle);
1767 		dlm_put_mle(mle);
1768 
1769 		if (extra_ref) {
1770 			/* the assert master message now balances the extra
1771 		 	 * ref given by the master / migration request message.
1772 		 	 * if this is the last put, it will be removed
1773 		 	 * from the list. */
1774 			dlm_put_mle(mle);
1775 		}
1776 	}
1777 
1778 done:
1779 	ret = 0;
1780 	if (res)
1781 		dlm_lockres_put(res);
1782 	dlm_put(dlm);
1783 	if (master_request) {
1784 		mlog(0, "need to tell master to reassert\n");
1785 		ret = EAGAIN;  // positive. negative would shoot down the node.
1786 	}
1787 	return ret;
1788 
1789 kill:
1790 	/* kill the caller! */
1791 	spin_unlock(&res->spinlock);
1792 	spin_unlock(&dlm->spinlock);
1793 	dlm_lockres_put(res);
1794 	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "
1795 	     "and killing the other node now!  This node is OK and can continue.\n");
1796 	dlm_dump_lock_resources(dlm);
1797 	dlm_put(dlm);
1798 	return -EINVAL;
1799 }
1800 
1801 int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,
1802 			       struct dlm_lock_resource *res,
1803 			       int ignore_higher, u8 request_from, u32 flags)
1804 {
1805 	struct dlm_work_item *item;
1806 	item = kcalloc(1, sizeof(*item), GFP_KERNEL);
1807 	if (!item)
1808 		return -ENOMEM;
1809 
1810 
1811 	/* queue up work for dlm_assert_master_worker */
1812 	dlm_grab(dlm);  /* get an extra ref for the work item */
1813 	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);
1814 	item->u.am.lockres = res; /* already have a ref */
1815 	/* can optionally ignore node numbers higher than this node */
1816 	item->u.am.ignore_higher = ignore_higher;
1817 	item->u.am.request_from = request_from;
1818 	item->u.am.flags = flags;
1819 
1820 	if (ignore_higher)
1821 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len,
1822 		     res->lockname.name);
1823 
1824 	spin_lock(&dlm->work_lock);
1825 	list_add_tail(&item->list, &dlm->work_list);
1826 	spin_unlock(&dlm->work_lock);
1827 
1828 	schedule_work(&dlm->dispatched_work);
1829 	return 0;
1830 }
1831 
1832 static void dlm_assert_master_worker(struct dlm_work_item *item, void *data)
1833 {
1834 	struct dlm_ctxt *dlm = data;
1835 	int ret = 0;
1836 	struct dlm_lock_resource *res;
1837 	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1838 	int ignore_higher;
1839 	int bit;
1840 	u8 request_from;
1841 	u32 flags;
1842 
1843 	dlm = item->dlm;
1844 	res = item->u.am.lockres;
1845 	ignore_higher = item->u.am.ignore_higher;
1846 	request_from = item->u.am.request_from;
1847 	flags = item->u.am.flags;
1848 
1849 	spin_lock(&dlm->spinlock);
1850 	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));
1851 	spin_unlock(&dlm->spinlock);
1852 
1853 	clear_bit(dlm->node_num, nodemap);
1854 	if (ignore_higher) {
1855 		/* if is this just to clear up mles for nodes below
1856 		 * this node, do not send the message to the original
1857 		 * caller or any node number higher than this */
1858 		clear_bit(request_from, nodemap);
1859 		bit = dlm->node_num;
1860 		while (1) {
1861 			bit = find_next_bit(nodemap, O2NM_MAX_NODES,
1862 					    bit+1);
1863 		       	if (bit >= O2NM_MAX_NODES)
1864 				break;
1865 			clear_bit(bit, nodemap);
1866 		}
1867 	}
1868 
1869 	/* this call now finishes out the nodemap
1870 	 * even if one or more nodes die */
1871 	mlog(0, "worker about to master %.*s here, this=%u\n",
1872 		     res->lockname.len, res->lockname.name, dlm->node_num);
1873 	ret = dlm_do_assert_master(dlm, res->lockname.name,
1874 				   res->lockname.len,
1875 				   nodemap, flags);
1876 	if (ret < 0) {
1877 		/* no need to restart, we are done */
1878 		mlog_errno(ret);
1879 	}
1880 
1881 	dlm_lockres_put(res);
1882 
1883 	mlog(0, "finished with dlm_assert_master_worker\n");
1884 }
1885 
1886 /* SPECIAL CASE for the $RECOVERY lock used by the recovery thread.
1887  * We cannot wait for node recovery to complete to begin mastering this
1888  * lockres because this lockres is used to kick off recovery! ;-)
1889  * So, do a pre-check on all living nodes to see if any of those nodes
1890  * think that $RECOVERY is currently mastered by a dead node.  If so,
1891  * we wait a short time to allow that node to get notified by its own
1892  * heartbeat stack, then check again.  All $RECOVERY lock resources
1893  * mastered by dead nodes are purged when the hearbeat callback is
1894  * fired, so we can know for sure that it is safe to continue once
1895  * the node returns a live node or no node.  */
1896 static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,
1897 				       struct dlm_lock_resource *res)
1898 {
1899 	struct dlm_node_iter iter;
1900 	int nodenum;
1901 	int ret = 0;
1902 	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;
1903 
1904 	spin_lock(&dlm->spinlock);
1905 	dlm_node_iter_init(dlm->domain_map, &iter);
1906 	spin_unlock(&dlm->spinlock);
1907 
1908 	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
1909 		/* do not send to self */
1910 		if (nodenum == dlm->node_num)
1911 			continue;
1912 		ret = dlm_do_master_requery(dlm, res, nodenum, &master);
1913 		if (ret < 0) {
1914 			mlog_errno(ret);
1915 			if (!dlm_is_host_down(ret))
1916 				BUG();
1917 			/* host is down, so answer for that node would be
1918 			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */
1919 		}
1920 
1921 		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {
1922 			/* check to see if this master is in the recovery map */
1923 			spin_lock(&dlm->spinlock);
1924 			if (test_bit(master, dlm->recovery_map)) {
1925 				mlog(ML_NOTICE, "%s: node %u has not seen "
1926 				     "node %u go down yet, and thinks the "
1927 				     "dead node is mastering the recovery "
1928 				     "lock.  must wait.\n", dlm->name,
1929 				     nodenum, master);
1930 				ret = -EAGAIN;
1931 			}
1932 			spin_unlock(&dlm->spinlock);
1933 			mlog(0, "%s: reco lock master is %u\n", dlm->name,
1934 			     master);
1935 			break;
1936 		}
1937 	}
1938 	return ret;
1939 }
1940 
1941 
1942 /*
1943  * DLM_MIGRATE_LOCKRES
1944  */
1945 
1946 
1947 int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
1948 			u8 target)
1949 {
1950 	struct dlm_master_list_entry *mle = NULL;
1951 	struct dlm_master_list_entry *oldmle = NULL;
1952  	struct dlm_migratable_lockres *mres = NULL;
1953 	int ret = -EINVAL;
1954 	const char *name;
1955 	unsigned int namelen;
1956 	int mle_added = 0;
1957 	struct list_head *queue, *iter;
1958 	int i;
1959 	struct dlm_lock *lock;
1960 	int empty = 1;
1961 
1962 	if (!dlm_grab(dlm))
1963 		return -EINVAL;
1964 
1965 	name = res->lockname.name;
1966 	namelen = res->lockname.len;
1967 
1968 	mlog(0, "migrating %.*s to %u\n", namelen, name, target);
1969 
1970 	/*
1971 	 * ensure this lockres is a proper candidate for migration
1972 	 */
1973 	spin_lock(&res->spinlock);
1974 	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
1975 		mlog(0, "cannot migrate lockres with unknown owner!\n");
1976 		spin_unlock(&res->spinlock);
1977 		goto leave;
1978 	}
1979 	if (res->owner != dlm->node_num) {
1980 		mlog(0, "cannot migrate lockres this node doesn't own!\n");
1981 		spin_unlock(&res->spinlock);
1982 		goto leave;
1983 	}
1984 	mlog(0, "checking queues...\n");
1985 	queue = &res->granted;
1986 	for (i=0; i<3; i++) {
1987 		list_for_each(iter, queue) {
1988 			lock = list_entry (iter, struct dlm_lock, list);
1989 			empty = 0;
1990 			if (lock->ml.node == dlm->node_num) {
1991 				mlog(0, "found a lock owned by this node "
1992 				     "still on the %s queue!  will not "
1993 				     "migrate this lockres\n",
1994 				     i==0 ? "granted" :
1995 				     (i==1 ? "converting" : "blocked"));
1996 				spin_unlock(&res->spinlock);
1997 				ret = -ENOTEMPTY;
1998 				goto leave;
1999 			}
2000 		}
2001 		queue++;
2002 	}
2003 	mlog(0, "all locks on this lockres are nonlocal.  continuing\n");
2004 	spin_unlock(&res->spinlock);
2005 
2006 	/* no work to do */
2007 	if (empty) {
2008 		mlog(0, "no locks were found on this lockres! done!\n");
2009 		ret = 0;
2010 		goto leave;
2011 	}
2012 
2013 	/*
2014 	 * preallocate up front
2015 	 * if this fails, abort
2016 	 */
2017 
2018 	ret = -ENOMEM;
2019 	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);
2020 	if (!mres) {
2021 		mlog_errno(ret);
2022 		goto leave;
2023 	}
2024 
2025 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2026 								GFP_KERNEL);
2027 	if (!mle) {
2028 		mlog_errno(ret);
2029 		goto leave;
2030 	}
2031 	ret = 0;
2032 
2033 	/*
2034 	 * find a node to migrate the lockres to
2035 	 */
2036 
2037 	mlog(0, "picking a migration node\n");
2038 	spin_lock(&dlm->spinlock);
2039 	/* pick a new node */
2040 	if (!test_bit(target, dlm->domain_map) ||
2041 	    target >= O2NM_MAX_NODES) {
2042 		target = dlm_pick_migration_target(dlm, res);
2043 	}
2044 	mlog(0, "node %u chosen for migration\n", target);
2045 
2046 	if (target >= O2NM_MAX_NODES ||
2047 	    !test_bit(target, dlm->domain_map)) {
2048 		/* target chosen is not alive */
2049 		ret = -EINVAL;
2050 	}
2051 
2052 	if (ret) {
2053 		spin_unlock(&dlm->spinlock);
2054 		goto fail;
2055 	}
2056 
2057 	mlog(0, "continuing with target = %u\n", target);
2058 
2059 	/*
2060 	 * clear any existing master requests and
2061 	 * add the migration mle to the list
2062 	 */
2063 	spin_lock(&dlm->master_lock);
2064 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,
2065 				    namelen, target, dlm->node_num);
2066 	spin_unlock(&dlm->master_lock);
2067 	spin_unlock(&dlm->spinlock);
2068 
2069 	if (ret == -EEXIST) {
2070 		mlog(0, "another process is already migrating it\n");
2071 		goto fail;
2072 	}
2073 	mle_added = 1;
2074 
2075 	/*
2076 	 * set the MIGRATING flag and flush asts
2077 	 * if we fail after this we need to re-dirty the lockres
2078 	 */
2079 	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {
2080 		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "
2081 		     "the target went down.\n", res->lockname.len,
2082 		     res->lockname.name, target);
2083 		spin_lock(&res->spinlock);
2084 		res->state &= ~DLM_LOCK_RES_MIGRATING;
2085 		spin_unlock(&res->spinlock);
2086 		ret = -EINVAL;
2087 	}
2088 
2089 fail:
2090 	if (oldmle) {
2091 		/* master is known, detach if not already detached */
2092 		dlm_mle_detach_hb_events(dlm, oldmle);
2093 		dlm_put_mle(oldmle);
2094 	}
2095 
2096 	if (ret < 0) {
2097 		if (mle_added) {
2098 			dlm_mle_detach_hb_events(dlm, mle);
2099 			dlm_put_mle(mle);
2100 		} else if (mle) {
2101 			kmem_cache_free(dlm_mle_cache, mle);
2102 		}
2103 		goto leave;
2104 	}
2105 
2106 	/*
2107 	 * at this point, we have a migration target, an mle
2108 	 * in the master list, and the MIGRATING flag set on
2109 	 * the lockres
2110 	 */
2111 
2112 
2113 	/* get an extra reference on the mle.
2114 	 * otherwise the assert_master from the new
2115 	 * master will destroy this.
2116 	 * also, make sure that all callers of dlm_get_mle
2117 	 * take both dlm->spinlock and dlm->master_lock */
2118 	spin_lock(&dlm->spinlock);
2119 	spin_lock(&dlm->master_lock);
2120 	dlm_get_mle(mle);
2121 	spin_unlock(&dlm->master_lock);
2122 	spin_unlock(&dlm->spinlock);
2123 
2124 	/* notify new node and send all lock state */
2125 	/* call send_one_lockres with migration flag.
2126 	 * this serves as notice to the target node that a
2127 	 * migration is starting. */
2128 	ret = dlm_send_one_lockres(dlm, res, mres, target,
2129 				   DLM_MRES_MIGRATION);
2130 
2131 	if (ret < 0) {
2132 		mlog(0, "migration to node %u failed with %d\n",
2133 		     target, ret);
2134 		/* migration failed, detach and clean up mle */
2135 		dlm_mle_detach_hb_events(dlm, mle);
2136 		dlm_put_mle(mle);
2137 		dlm_put_mle(mle);
2138 		goto leave;
2139 	}
2140 
2141 	/* at this point, the target sends a message to all nodes,
2142 	 * (using dlm_do_migrate_request).  this node is skipped since
2143 	 * we had to put an mle in the list to begin the process.  this
2144 	 * node now waits for target to do an assert master.  this node
2145 	 * will be the last one notified, ensuring that the migration
2146 	 * is complete everywhere.  if the target dies while this is
2147 	 * going on, some nodes could potentially see the target as the
2148 	 * master, so it is important that my recovery finds the migration
2149 	 * mle and sets the master to UNKNONWN. */
2150 
2151 
2152 	/* wait for new node to assert master */
2153 	while (1) {
2154 		ret = wait_event_interruptible_timeout(mle->wq,
2155 					(atomic_read(&mle->woken) == 1),
2156 					msecs_to_jiffies(5000));
2157 
2158 		if (ret >= 0) {
2159 		       	if (atomic_read(&mle->woken) == 1 ||
2160 			    res->owner == target)
2161 				break;
2162 
2163 			mlog(0, "timed out during migration\n");
2164 			/* avoid hang during shutdown when migrating lockres
2165 			 * to a node which also goes down */
2166 			if (dlm_is_node_dead(dlm, target)) {
2167 				mlog(0, "%s:%.*s: expected migration target %u "
2168 				     "is no longer up.  restarting.\n",
2169 				     dlm->name, res->lockname.len,
2170 				     res->lockname.name, target);
2171 				ret = -ERESTARTSYS;
2172 			}
2173 		}
2174 		if (ret == -ERESTARTSYS) {
2175 			/* migration failed, detach and clean up mle */
2176 			dlm_mle_detach_hb_events(dlm, mle);
2177 			dlm_put_mle(mle);
2178 			dlm_put_mle(mle);
2179 			goto leave;
2180 		}
2181 		/* TODO: if node died: stop, clean up, return error */
2182 	}
2183 
2184 	/* all done, set the owner, clear the flag */
2185 	spin_lock(&res->spinlock);
2186 	dlm_set_lockres_owner(dlm, res, target);
2187 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2188 	dlm_remove_nonlocal_locks(dlm, res);
2189 	spin_unlock(&res->spinlock);
2190 	wake_up(&res->wq);
2191 
2192 	/* master is known, detach if not already detached */
2193 	dlm_mle_detach_hb_events(dlm, mle);
2194 	dlm_put_mle(mle);
2195 	ret = 0;
2196 
2197 	dlm_lockres_calc_usage(dlm, res);
2198 
2199 leave:
2200 	/* re-dirty the lockres if we failed */
2201 	if (ret < 0)
2202 		dlm_kick_thread(dlm, res);
2203 
2204 	/* TODO: cleanup */
2205 	if (mres)
2206 		free_page((unsigned long)mres);
2207 
2208 	dlm_put(dlm);
2209 
2210 	mlog(0, "returning %d\n", ret);
2211 	return ret;
2212 }
2213 EXPORT_SYMBOL_GPL(dlm_migrate_lockres);
2214 
2215 int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock)
2216 {
2217 	int ret;
2218 	spin_lock(&dlm->ast_lock);
2219 	spin_lock(&lock->spinlock);
2220 	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);
2221 	spin_unlock(&lock->spinlock);
2222 	spin_unlock(&dlm->ast_lock);
2223 	return ret;
2224 }
2225 
2226 static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,
2227 				     struct dlm_lock_resource *res,
2228 				     u8 mig_target)
2229 {
2230 	int can_proceed;
2231 	spin_lock(&res->spinlock);
2232 	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);
2233 	spin_unlock(&res->spinlock);
2234 
2235 	/* target has died, so make the caller break out of the
2236 	 * wait_event, but caller must recheck the domain_map */
2237 	spin_lock(&dlm->spinlock);
2238 	if (!test_bit(mig_target, dlm->domain_map))
2239 		can_proceed = 1;
2240 	spin_unlock(&dlm->spinlock);
2241 	return can_proceed;
2242 }
2243 
2244 int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
2245 {
2246 	int ret;
2247 	spin_lock(&res->spinlock);
2248 	ret = !!(res->state & DLM_LOCK_RES_DIRTY);
2249 	spin_unlock(&res->spinlock);
2250 	return ret;
2251 }
2252 
2253 
2254 static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,
2255 				       struct dlm_lock_resource *res,
2256 				       u8 target)
2257 {
2258 	int ret = 0;
2259 
2260 	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",
2261 	       res->lockname.len, res->lockname.name, dlm->node_num,
2262 	       target);
2263 	/* need to set MIGRATING flag on lockres.  this is done by
2264 	 * ensuring that all asts have been flushed for this lockres. */
2265 	spin_lock(&res->spinlock);
2266 	BUG_ON(res->migration_pending);
2267 	res->migration_pending = 1;
2268 	/* strategy is to reserve an extra ast then release
2269 	 * it below, letting the release do all of the work */
2270 	__dlm_lockres_reserve_ast(res);
2271 	spin_unlock(&res->spinlock);
2272 
2273 	/* now flush all the pending asts.. hang out for a bit */
2274 	dlm_kick_thread(dlm, res);
2275 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
2276 	dlm_lockres_release_ast(dlm, res);
2277 
2278 	mlog(0, "about to wait on migration_wq, dirty=%s\n",
2279 	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
2280 	/* if the extra ref we just put was the final one, this
2281 	 * will pass thru immediately.  otherwise, we need to wait
2282 	 * for the last ast to finish. */
2283 again:
2284 	ret = wait_event_interruptible_timeout(dlm->migration_wq,
2285 		   dlm_migration_can_proceed(dlm, res, target),
2286 		   msecs_to_jiffies(1000));
2287 	if (ret < 0) {
2288 		mlog(0, "woken again: migrating? %s, dead? %s\n",
2289 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2290 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2291 	} else {
2292 		mlog(0, "all is well: migrating? %s, dead? %s\n",
2293 		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",
2294 		       test_bit(target, dlm->domain_map) ? "no":"yes");
2295 	}
2296 	if (!dlm_migration_can_proceed(dlm, res, target)) {
2297 		mlog(0, "trying again...\n");
2298 		goto again;
2299 	}
2300 
2301 	/* did the target go down or die? */
2302 	spin_lock(&dlm->spinlock);
2303 	if (!test_bit(target, dlm->domain_map)) {
2304 		mlog(ML_ERROR, "aha. migration target %u just went down\n",
2305 		     target);
2306 		ret = -EHOSTDOWN;
2307 	}
2308 	spin_unlock(&dlm->spinlock);
2309 
2310 	/*
2311 	 * at this point:
2312 	 *
2313 	 *   o the DLM_LOCK_RES_MIGRATING flag is set
2314 	 *   o there are no pending asts on this lockres
2315 	 *   o all processes trying to reserve an ast on this
2316 	 *     lockres must wait for the MIGRATING flag to clear
2317 	 */
2318 	return ret;
2319 }
2320 
2321 /* last step in the migration process.
2322  * original master calls this to free all of the dlm_lock
2323  * structures that used to be for other nodes. */
2324 static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,
2325 				      struct dlm_lock_resource *res)
2326 {
2327 	struct list_head *iter, *iter2;
2328 	struct list_head *queue = &res->granted;
2329 	int i;
2330 	struct dlm_lock *lock;
2331 
2332 	assert_spin_locked(&res->spinlock);
2333 
2334 	BUG_ON(res->owner == dlm->node_num);
2335 
2336 	for (i=0; i<3; i++) {
2337 		list_for_each_safe(iter, iter2, queue) {
2338 			lock = list_entry (iter, struct dlm_lock, list);
2339 			if (lock->ml.node != dlm->node_num) {
2340 				mlog(0, "putting lock for node %u\n",
2341 				     lock->ml.node);
2342 				/* be extra careful */
2343 				BUG_ON(!list_empty(&lock->ast_list));
2344 				BUG_ON(!list_empty(&lock->bast_list));
2345 				BUG_ON(lock->ast_pending);
2346 				BUG_ON(lock->bast_pending);
2347 				list_del_init(&lock->list);
2348 				dlm_lock_put(lock);
2349 			}
2350 		}
2351 		queue++;
2352 	}
2353 }
2354 
2355 /* for now this is not too intelligent.  we will
2356  * need stats to make this do the right thing.
2357  * this just finds the first lock on one of the
2358  * queues and uses that node as the target. */
2359 static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,
2360 				    struct dlm_lock_resource *res)
2361 {
2362 	int i;
2363 	struct list_head *queue = &res->granted;
2364 	struct list_head *iter;
2365 	struct dlm_lock *lock;
2366 	int nodenum;
2367 
2368 	assert_spin_locked(&dlm->spinlock);
2369 
2370 	spin_lock(&res->spinlock);
2371 	for (i=0; i<3; i++) {
2372 		list_for_each(iter, queue) {
2373 			/* up to the caller to make sure this node
2374 			 * is alive */
2375 			lock = list_entry (iter, struct dlm_lock, list);
2376 			if (lock->ml.node != dlm->node_num) {
2377 				spin_unlock(&res->spinlock);
2378 				return lock->ml.node;
2379 			}
2380 		}
2381 		queue++;
2382 	}
2383 	spin_unlock(&res->spinlock);
2384 	mlog(0, "have not found a suitable target yet! checking domain map\n");
2385 
2386 	/* ok now we're getting desperate.  pick anyone alive. */
2387 	nodenum = -1;
2388 	while (1) {
2389 		nodenum = find_next_bit(dlm->domain_map,
2390 					O2NM_MAX_NODES, nodenum+1);
2391 		mlog(0, "found %d in domain map\n", nodenum);
2392 		if (nodenum >= O2NM_MAX_NODES)
2393 			break;
2394 		if (nodenum != dlm->node_num) {
2395 			mlog(0, "picking %d\n", nodenum);
2396 			return nodenum;
2397 		}
2398 	}
2399 
2400 	mlog(0, "giving up.  no master to migrate to\n");
2401 	return DLM_LOCK_RES_OWNER_UNKNOWN;
2402 }
2403 
2404 
2405 
2406 /* this is called by the new master once all lockres
2407  * data has been received */
2408 static int dlm_do_migrate_request(struct dlm_ctxt *dlm,
2409 				  struct dlm_lock_resource *res,
2410 				  u8 master, u8 new_master,
2411 				  struct dlm_node_iter *iter)
2412 {
2413 	struct dlm_migrate_request migrate;
2414 	int ret, status = 0;
2415 	int nodenum;
2416 
2417 	memset(&migrate, 0, sizeof(migrate));
2418 	migrate.namelen = res->lockname.len;
2419 	memcpy(migrate.name, res->lockname.name, migrate.namelen);
2420 	migrate.new_master = new_master;
2421 	migrate.master = master;
2422 
2423 	ret = 0;
2424 
2425 	/* send message to all nodes, except the master and myself */
2426 	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {
2427 		if (nodenum == master ||
2428 		    nodenum == new_master)
2429 			continue;
2430 
2431 		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,
2432 					 &migrate, sizeof(migrate), nodenum,
2433 					 &status);
2434 		if (ret < 0)
2435 			mlog_errno(ret);
2436 		else if (status < 0) {
2437 			mlog(0, "migrate request (node %u) returned %d!\n",
2438 			     nodenum, status);
2439 			ret = status;
2440 		}
2441 	}
2442 
2443 	if (ret < 0)
2444 		mlog_errno(ret);
2445 
2446 	mlog(0, "returning ret=%d\n", ret);
2447 	return ret;
2448 }
2449 
2450 
2451 /* if there is an existing mle for this lockres, we now know who the master is.
2452  * (the one who sent us *this* message) we can clear it up right away.
2453  * since the process that put the mle on the list still has a reference to it,
2454  * we can unhash it now, set the master and wake the process.  as a result,
2455  * we will have no mle in the list to start with.  now we can add an mle for
2456  * the migration and this should be the only one found for those scanning the
2457  * list.  */
2458 int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data)
2459 {
2460 	struct dlm_ctxt *dlm = data;
2461 	struct dlm_lock_resource *res = NULL;
2462 	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;
2463 	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;
2464 	const char *name;
2465 	unsigned int namelen;
2466 	int ret = 0;
2467 
2468 	if (!dlm_grab(dlm))
2469 		return -EINVAL;
2470 
2471 	name = migrate->name;
2472 	namelen = migrate->namelen;
2473 
2474 	/* preallocate.. if this fails, abort */
2475 	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,
2476 							 GFP_KERNEL);
2477 
2478 	if (!mle) {
2479 		ret = -ENOMEM;
2480 		goto leave;
2481 	}
2482 
2483 	/* check for pre-existing lock */
2484 	spin_lock(&dlm->spinlock);
2485 	res = __dlm_lookup_lockres(dlm, name, namelen);
2486 	spin_lock(&dlm->master_lock);
2487 
2488 	if (res) {
2489 		spin_lock(&res->spinlock);
2490 		if (res->state & DLM_LOCK_RES_RECOVERING) {
2491 			/* if all is working ok, this can only mean that we got
2492 		 	* a migrate request from a node that we now see as
2493 		 	* dead.  what can we do here?  drop it to the floor? */
2494 			spin_unlock(&res->spinlock);
2495 			mlog(ML_ERROR, "Got a migrate request, but the "
2496 			     "lockres is marked as recovering!");
2497 			kmem_cache_free(dlm_mle_cache, mle);
2498 			ret = -EINVAL; /* need a better solution */
2499 			goto unlock;
2500 		}
2501 		res->state |= DLM_LOCK_RES_MIGRATING;
2502 		spin_unlock(&res->spinlock);
2503 	}
2504 
2505 	/* ignore status.  only nonzero status would BUG. */
2506 	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,
2507 				    name, namelen,
2508 				    migrate->new_master,
2509 				    migrate->master);
2510 
2511 unlock:
2512 	spin_unlock(&dlm->master_lock);
2513 	spin_unlock(&dlm->spinlock);
2514 
2515 	if (oldmle) {
2516 		/* master is known, detach if not already detached */
2517 		dlm_mle_detach_hb_events(dlm, oldmle);
2518 		dlm_put_mle(oldmle);
2519 	}
2520 
2521 	if (res)
2522 		dlm_lockres_put(res);
2523 leave:
2524 	dlm_put(dlm);
2525 	return ret;
2526 }
2527 
2528 /* must be holding dlm->spinlock and dlm->master_lock
2529  * when adding a migration mle, we can clear any other mles
2530  * in the master list because we know with certainty that
2531  * the master is "master".  so we remove any old mle from
2532  * the list after setting it's master field, and then add
2533  * the new migration mle.  this way we can hold with the rule
2534  * of having only one mle for a given lock name at all times. */
2535 static int dlm_add_migration_mle(struct dlm_ctxt *dlm,
2536 				 struct dlm_lock_resource *res,
2537 				 struct dlm_master_list_entry *mle,
2538 				 struct dlm_master_list_entry **oldmle,
2539 				 const char *name, unsigned int namelen,
2540 				 u8 new_master, u8 master)
2541 {
2542 	int found;
2543 	int ret = 0;
2544 
2545 	*oldmle = NULL;
2546 
2547 	mlog_entry_void();
2548 
2549 	assert_spin_locked(&dlm->spinlock);
2550 	assert_spin_locked(&dlm->master_lock);
2551 
2552 	/* caller is responsible for any ref taken here on oldmle */
2553 	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);
2554 	if (found) {
2555 		struct dlm_master_list_entry *tmp = *oldmle;
2556 		spin_lock(&tmp->spinlock);
2557 		if (tmp->type == DLM_MLE_MIGRATION) {
2558 			if (master == dlm->node_num) {
2559 				/* ah another process raced me to it */
2560 				mlog(0, "tried to migrate %.*s, but some "
2561 				     "process beat me to it\n",
2562 				     namelen, name);
2563 				ret = -EEXIST;
2564 			} else {
2565 				/* bad.  2 NODES are trying to migrate! */
2566 				mlog(ML_ERROR, "migration error  mle: "
2567 				     "master=%u new_master=%u // request: "
2568 				     "master=%u new_master=%u // "
2569 				     "lockres=%.*s\n",
2570 				     tmp->master, tmp->new_master,
2571 				     master, new_master,
2572 				     namelen, name);
2573 				BUG();
2574 			}
2575 		} else {
2576 			/* this is essentially what assert_master does */
2577 			tmp->master = master;
2578 			atomic_set(&tmp->woken, 1);
2579 			wake_up(&tmp->wq);
2580 			/* remove it from the list so that only one
2581 			 * mle will be found */
2582 			list_del_init(&tmp->list);
2583 		}
2584 		spin_unlock(&tmp->spinlock);
2585 	}
2586 
2587 	/* now add a migration mle to the tail of the list */
2588 	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);
2589 	mle->new_master = new_master;
2590 	mle->master = master;
2591 	/* do this for consistency with other mle types */
2592 	set_bit(new_master, mle->maybe_map);
2593 	list_add(&mle->list, &dlm->master_list);
2594 
2595 	return ret;
2596 }
2597 
2598 
2599 void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
2600 {
2601 	struct list_head *iter, *iter2;
2602 	struct dlm_master_list_entry *mle;
2603 	struct dlm_lock_resource *res;
2604 
2605 	mlog_entry("dlm=%s, dead node=%u\n", dlm->name, dead_node);
2606 top:
2607 	assert_spin_locked(&dlm->spinlock);
2608 
2609 	/* clean the master list */
2610 	spin_lock(&dlm->master_lock);
2611 	list_for_each_safe(iter, iter2, &dlm->master_list) {
2612 		mle = list_entry(iter, struct dlm_master_list_entry, list);
2613 
2614 		BUG_ON(mle->type != DLM_MLE_BLOCK &&
2615 		       mle->type != DLM_MLE_MASTER &&
2616 		       mle->type != DLM_MLE_MIGRATION);
2617 
2618 		/* MASTER mles are initiated locally.  the waiting
2619 		 * process will notice the node map change
2620 		 * shortly.  let that happen as normal. */
2621 		if (mle->type == DLM_MLE_MASTER)
2622 			continue;
2623 
2624 
2625 		/* BLOCK mles are initiated by other nodes.
2626 		 * need to clean up if the dead node would have
2627 		 * been the master. */
2628 		if (mle->type == DLM_MLE_BLOCK) {
2629 			int bit;
2630 
2631 			spin_lock(&mle->spinlock);
2632 			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);
2633 			if (bit != dead_node) {
2634 				mlog(0, "mle found, but dead node %u would "
2635 				     "not have been master\n", dead_node);
2636 				spin_unlock(&mle->spinlock);
2637 			} else {
2638 				/* must drop the refcount by one since the
2639 				 * assert_master will never arrive.  this
2640 				 * may result in the mle being unlinked and
2641 				 * freed, but there may still be a process
2642 				 * waiting in the dlmlock path which is fine. */
2643 				mlog(ML_ERROR, "node %u was expected master\n",
2644 				     dead_node);
2645 				atomic_set(&mle->woken, 1);
2646 				spin_unlock(&mle->spinlock);
2647 				wake_up(&mle->wq);
2648 				/* do not need events any longer, so detach
2649 				 * from heartbeat */
2650 				__dlm_mle_detach_hb_events(dlm, mle);
2651 				__dlm_put_mle(mle);
2652 			}
2653 			continue;
2654 		}
2655 
2656 		/* everything else is a MIGRATION mle */
2657 
2658 		/* the rule for MIGRATION mles is that the master
2659 		 * becomes UNKNOWN if *either* the original or
2660 		 * the new master dies.  all UNKNOWN lockreses
2661 		 * are sent to whichever node becomes the recovery
2662 		 * master.  the new master is responsible for
2663 		 * determining if there is still a master for
2664 		 * this lockres, or if he needs to take over
2665 		 * mastery.  either way, this node should expect
2666 		 * another message to resolve this. */
2667 		if (mle->master != dead_node &&
2668 		    mle->new_master != dead_node)
2669 			continue;
2670 
2671 		/* if we have reached this point, this mle needs to
2672 		 * be removed from the list and freed. */
2673 
2674 		/* remove from the list early.  NOTE: unlinking
2675 		 * list_head while in list_for_each_safe */
2676 		spin_lock(&mle->spinlock);
2677 		list_del_init(&mle->list);
2678 		atomic_set(&mle->woken, 1);
2679 		spin_unlock(&mle->spinlock);
2680 		wake_up(&mle->wq);
2681 
2682 		mlog(0, "node %u died during migration from "
2683 		     "%u to %u!\n", dead_node,
2684 		     mle->master, mle->new_master);
2685 		/* if there is a lockres associated with this
2686 	 	 * mle, find it and set its owner to UNKNOWN */
2687 		res = __dlm_lookup_lockres(dlm, mle->u.name.name,
2688 					mle->u.name.len);
2689 		if (res) {
2690 			/* unfortunately if we hit this rare case, our
2691 		 	 * lock ordering is messed.  we need to drop
2692 		 	 * the master lock so that we can take the
2693 		  	 * lockres lock, meaning that we will have to
2694 			 * restart from the head of list. */
2695 			spin_unlock(&dlm->master_lock);
2696 
2697 			/* move lockres onto recovery list */
2698 			spin_lock(&res->spinlock);
2699 			dlm_set_lockres_owner(dlm, res,
2700 				      	DLM_LOCK_RES_OWNER_UNKNOWN);
2701 			dlm_move_lockres_to_recovery_list(dlm, res);
2702 			spin_unlock(&res->spinlock);
2703 			dlm_lockres_put(res);
2704 
2705 			/* about to get rid of mle, detach from heartbeat */
2706 			__dlm_mle_detach_hb_events(dlm, mle);
2707 
2708 			/* dump the mle */
2709 			spin_lock(&dlm->master_lock);
2710 			__dlm_put_mle(mle);
2711 			spin_unlock(&dlm->master_lock);
2712 
2713 			/* restart */
2714 			goto top;
2715 		}
2716 
2717 		/* this may be the last reference */
2718 		__dlm_put_mle(mle);
2719 	}
2720 	spin_unlock(&dlm->master_lock);
2721 }
2722 
2723 
2724 int dlm_finish_migration(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
2725 			 u8 old_master)
2726 {
2727 	struct dlm_node_iter iter;
2728 	int ret = 0;
2729 
2730 	spin_lock(&dlm->spinlock);
2731 	dlm_node_iter_init(dlm->domain_map, &iter);
2732 	clear_bit(old_master, iter.node_map);
2733 	clear_bit(dlm->node_num, iter.node_map);
2734 	spin_unlock(&dlm->spinlock);
2735 
2736 	mlog(0, "now time to do a migrate request to other nodes\n");
2737 	ret = dlm_do_migrate_request(dlm, res, old_master,
2738 				     dlm->node_num, &iter);
2739 	if (ret < 0) {
2740 		mlog_errno(ret);
2741 		goto leave;
2742 	}
2743 
2744 	mlog(0, "doing assert master of %.*s to all except the original node\n",
2745 	     res->lockname.len, res->lockname.name);
2746 	/* this call now finishes out the nodemap
2747 	 * even if one or more nodes die */
2748 	ret = dlm_do_assert_master(dlm, res->lockname.name,
2749 				   res->lockname.len, iter.node_map,
2750 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
2751 	if (ret < 0) {
2752 		/* no longer need to retry.  all living nodes contacted. */
2753 		mlog_errno(ret);
2754 		ret = 0;
2755 	}
2756 
2757 	memset(iter.node_map, 0, sizeof(iter.node_map));
2758 	set_bit(old_master, iter.node_map);
2759 	mlog(0, "doing assert master of %.*s back to %u\n",
2760 	     res->lockname.len, res->lockname.name, old_master);
2761 	ret = dlm_do_assert_master(dlm, res->lockname.name,
2762 				   res->lockname.len, iter.node_map,
2763 				   DLM_ASSERT_MASTER_FINISH_MIGRATION);
2764 	if (ret < 0) {
2765 		mlog(0, "assert master to original master failed "
2766 		     "with %d.\n", ret);
2767 		/* the only nonzero status here would be because of
2768 		 * a dead original node.  we're done. */
2769 		ret = 0;
2770 	}
2771 
2772 	/* all done, set the owner, clear the flag */
2773 	spin_lock(&res->spinlock);
2774 	dlm_set_lockres_owner(dlm, res, dlm->node_num);
2775 	res->state &= ~DLM_LOCK_RES_MIGRATING;
2776 	spin_unlock(&res->spinlock);
2777 	/* re-dirty it on the new master */
2778 	dlm_kick_thread(dlm, res);
2779 	wake_up(&res->wq);
2780 leave:
2781 	return ret;
2782 }
2783 
2784 /*
2785  * LOCKRES AST REFCOUNT
2786  * this is integral to migration
2787  */
2788 
2789 /* for future intent to call an ast, reserve one ahead of time.
2790  * this should be called only after waiting on the lockres
2791  * with dlm_wait_on_lockres, and while still holding the
2792  * spinlock after the call. */
2793 void __dlm_lockres_reserve_ast(struct dlm_lock_resource *res)
2794 {
2795 	assert_spin_locked(&res->spinlock);
2796 	if (res->state & DLM_LOCK_RES_MIGRATING) {
2797 		__dlm_print_one_lock_resource(res);
2798 	}
2799 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2800 
2801 	atomic_inc(&res->asts_reserved);
2802 }
2803 
2804 /*
2805  * used to drop the reserved ast, either because it went unused,
2806  * or because the ast/bast was actually called.
2807  *
2808  * also, if there is a pending migration on this lockres,
2809  * and this was the last pending ast on the lockres,
2810  * atomically set the MIGRATING flag before we drop the lock.
2811  * this is how we ensure that migration can proceed with no
2812  * asts in progress.  note that it is ok if the state of the
2813  * queues is such that a lock should be granted in the future
2814  * or that a bast should be fired, because the new master will
2815  * shuffle the lists on this lockres as soon as it is migrated.
2816  */
2817 void dlm_lockres_release_ast(struct dlm_ctxt *dlm,
2818 			     struct dlm_lock_resource *res)
2819 {
2820 	if (!atomic_dec_and_lock(&res->asts_reserved, &res->spinlock))
2821 		return;
2822 
2823 	if (!res->migration_pending) {
2824 		spin_unlock(&res->spinlock);
2825 		return;
2826 	}
2827 
2828 	BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
2829 	res->migration_pending = 0;
2830 	res->state |= DLM_LOCK_RES_MIGRATING;
2831 	spin_unlock(&res->spinlock);
2832 	wake_up(&res->wq);
2833 	wake_up(&dlm->migration_wq);
2834 }
2835