xref: /linux/fs/ocfs2/dlm/dlmdomain.c (revision d39d0ed196aa1685bb24771e92f78633c66ac9cb)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * dlmdomain.c
5  *
6  * defines domain join / leave apis
7  *
8  * Copyright (C) 2004 Oracle.  All rights reserved.
9  *
10  * This program is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU General Public
12  * License as published by the Free Software Foundation; either
13  * version 2 of the License, or (at your option) any later version.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18  * General Public License for more details.
19  *
20  * You should have received a copy of the GNU General Public
21  * License along with this program; if not, write to the
22  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23  * Boston, MA 021110-1307, USA.
24  *
25  */
26 
27 #include <linux/module.h>
28 #include <linux/types.h>
29 #include <linux/slab.h>
30 #include <linux/highmem.h>
31 #include <linux/init.h>
32 #include <linux/spinlock.h>
33 #include <linux/delay.h>
34 #include <linux/err.h>
35 #include <linux/debugfs.h>
36 
37 #include "cluster/heartbeat.h"
38 #include "cluster/nodemanager.h"
39 #include "cluster/tcp.h"
40 
41 #include "dlmapi.h"
42 #include "dlmcommon.h"
43 #include "dlmdomain.h"
44 #include "dlmdebug.h"
45 
46 #include "dlmver.h"
47 
48 #define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_DOMAIN)
49 #include "cluster/masklog.h"
50 
51 /*
52  * ocfs2 node maps are array of long int, which limits to send them freely
53  * across the wire due to endianness issues. To workaround this, we convert
54  * long ints to byte arrays. Following 3 routines are helper functions to
55  * set/test/copy bits within those array of bytes
56  */
57 static inline void byte_set_bit(u8 nr, u8 map[])
58 {
59 	map[nr >> 3] |= (1UL << (nr & 7));
60 }
61 
62 static inline int byte_test_bit(u8 nr, u8 map[])
63 {
64 	return ((1UL << (nr & 7)) & (map[nr >> 3])) != 0;
65 }
66 
67 static inline void byte_copymap(u8 dmap[], unsigned long smap[],
68 			unsigned int sz)
69 {
70 	unsigned int nn;
71 
72 	if (!sz)
73 		return;
74 
75 	memset(dmap, 0, ((sz + 7) >> 3));
76 	for (nn = 0 ; nn < sz; nn++)
77 		if (test_bit(nn, smap))
78 			byte_set_bit(nn, dmap);
79 }
80 
81 static void dlm_free_pagevec(void **vec, int pages)
82 {
83 	while (pages--)
84 		free_page((unsigned long)vec[pages]);
85 	kfree(vec);
86 }
87 
88 static void **dlm_alloc_pagevec(int pages)
89 {
90 	void **vec = kmalloc(pages * sizeof(void *), GFP_KERNEL);
91 	int i;
92 
93 	if (!vec)
94 		return NULL;
95 
96 	for (i = 0; i < pages; i++)
97 		if (!(vec[i] = (void *)__get_free_page(GFP_KERNEL)))
98 			goto out_free;
99 
100 	mlog(0, "Allocated DLM hash pagevec; %d pages (%lu expected), %lu buckets per page\n",
101 	     pages, (unsigned long)DLM_HASH_PAGES,
102 	     (unsigned long)DLM_BUCKETS_PER_PAGE);
103 	return vec;
104 out_free:
105 	dlm_free_pagevec(vec, i);
106 	return NULL;
107 }
108 
109 /*
110  *
111  * spinlock lock ordering: if multiple locks are needed, obey this ordering:
112  *    dlm_domain_lock
113  *    struct dlm_ctxt->spinlock
114  *    struct dlm_lock_resource->spinlock
115  *    struct dlm_ctxt->master_lock
116  *    struct dlm_ctxt->ast_lock
117  *    dlm_master_list_entry->spinlock
118  *    dlm_lock->spinlock
119  *
120  */
121 
122 DEFINE_SPINLOCK(dlm_domain_lock);
123 LIST_HEAD(dlm_domains);
124 static DECLARE_WAIT_QUEUE_HEAD(dlm_domain_events);
125 
126 /*
127  * The supported protocol version for DLM communication.  Running domains
128  * will have a negotiated version with the same major number and a minor
129  * number equal or smaller.  The dlm_ctxt->dlm_locking_proto field should
130  * be used to determine what a running domain is actually using.
131  */
132 static const struct dlm_protocol_version dlm_protocol = {
133 	.pv_major = 1,
134 	.pv_minor = 0,
135 };
136 
137 #define DLM_DOMAIN_BACKOFF_MS 200
138 
139 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
140 				  void **ret_data);
141 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
142 				     void **ret_data);
143 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
144 				   void **ret_data);
145 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
146 				   void **ret_data);
147 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
148 				struct dlm_protocol_version *request);
149 
150 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm);
151 
152 void __dlm_unhash_lockres(struct dlm_lock_resource *lockres)
153 {
154 	if (!hlist_unhashed(&lockres->hash_node)) {
155 		hlist_del_init(&lockres->hash_node);
156 		dlm_lockres_put(lockres);
157 	}
158 }
159 
160 void __dlm_insert_lockres(struct dlm_ctxt *dlm,
161 		       struct dlm_lock_resource *res)
162 {
163 	struct hlist_head *bucket;
164 	struct qstr *q;
165 
166 	assert_spin_locked(&dlm->spinlock);
167 
168 	q = &res->lockname;
169 	bucket = dlm_lockres_hash(dlm, q->hash);
170 
171 	/* get a reference for our hashtable */
172 	dlm_lockres_get(res);
173 
174 	hlist_add_head(&res->hash_node, bucket);
175 }
176 
177 struct dlm_lock_resource * __dlm_lookup_lockres_full(struct dlm_ctxt *dlm,
178 						     const char *name,
179 						     unsigned int len,
180 						     unsigned int hash)
181 {
182 	struct hlist_head *bucket;
183 	struct hlist_node *list;
184 
185 	mlog_entry("%.*s\n", len, name);
186 
187 	assert_spin_locked(&dlm->spinlock);
188 
189 	bucket = dlm_lockres_hash(dlm, hash);
190 
191 	hlist_for_each(list, bucket) {
192 		struct dlm_lock_resource *res = hlist_entry(list,
193 			struct dlm_lock_resource, hash_node);
194 		if (res->lockname.name[0] != name[0])
195 			continue;
196 		if (unlikely(res->lockname.len != len))
197 			continue;
198 		if (memcmp(res->lockname.name + 1, name + 1, len - 1))
199 			continue;
200 		dlm_lockres_get(res);
201 		return res;
202 	}
203 	return NULL;
204 }
205 
206 /* intended to be called by functions which do not care about lock
207  * resources which are being purged (most net _handler functions).
208  * this will return NULL for any lock resource which is found but
209  * currently in the process of dropping its mastery reference.
210  * use __dlm_lookup_lockres_full when you need the lock resource
211  * regardless (e.g. dlm_get_lock_resource) */
212 struct dlm_lock_resource * __dlm_lookup_lockres(struct dlm_ctxt *dlm,
213 						const char *name,
214 						unsigned int len,
215 						unsigned int hash)
216 {
217 	struct dlm_lock_resource *res = NULL;
218 
219 	mlog_entry("%.*s\n", len, name);
220 
221 	assert_spin_locked(&dlm->spinlock);
222 
223 	res = __dlm_lookup_lockres_full(dlm, name, len, hash);
224 	if (res) {
225 		spin_lock(&res->spinlock);
226 		if (res->state & DLM_LOCK_RES_DROPPING_REF) {
227 			spin_unlock(&res->spinlock);
228 			dlm_lockres_put(res);
229 			return NULL;
230 		}
231 		spin_unlock(&res->spinlock);
232 	}
233 
234 	return res;
235 }
236 
237 struct dlm_lock_resource * dlm_lookup_lockres(struct dlm_ctxt *dlm,
238 				    const char *name,
239 				    unsigned int len)
240 {
241 	struct dlm_lock_resource *res;
242 	unsigned int hash = dlm_lockid_hash(name, len);
243 
244 	spin_lock(&dlm->spinlock);
245 	res = __dlm_lookup_lockres(dlm, name, len, hash);
246 	spin_unlock(&dlm->spinlock);
247 	return res;
248 }
249 
250 static struct dlm_ctxt * __dlm_lookup_domain_full(const char *domain, int len)
251 {
252 	struct dlm_ctxt *tmp = NULL;
253 	struct list_head *iter;
254 
255 	assert_spin_locked(&dlm_domain_lock);
256 
257 	/* tmp->name here is always NULL terminated,
258 	 * but domain may not be! */
259 	list_for_each(iter, &dlm_domains) {
260 		tmp = list_entry (iter, struct dlm_ctxt, list);
261 		if (strlen(tmp->name) == len &&
262 		    memcmp(tmp->name, domain, len)==0)
263 			break;
264 		tmp = NULL;
265 	}
266 
267 	return tmp;
268 }
269 
270 /* For null terminated domain strings ONLY */
271 static struct dlm_ctxt * __dlm_lookup_domain(const char *domain)
272 {
273 	assert_spin_locked(&dlm_domain_lock);
274 
275 	return __dlm_lookup_domain_full(domain, strlen(domain));
276 }
277 
278 
279 /* returns true on one of two conditions:
280  * 1) the domain does not exist
281  * 2) the domain exists and it's state is "joined" */
282 static int dlm_wait_on_domain_helper(const char *domain)
283 {
284 	int ret = 0;
285 	struct dlm_ctxt *tmp = NULL;
286 
287 	spin_lock(&dlm_domain_lock);
288 
289 	tmp = __dlm_lookup_domain(domain);
290 	if (!tmp)
291 		ret = 1;
292 	else if (tmp->dlm_state == DLM_CTXT_JOINED)
293 		ret = 1;
294 
295 	spin_unlock(&dlm_domain_lock);
296 	return ret;
297 }
298 
299 static void dlm_free_ctxt_mem(struct dlm_ctxt *dlm)
300 {
301 	dlm_destroy_debugfs_subroot(dlm);
302 
303 	if (dlm->lockres_hash)
304 		dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
305 
306 	if (dlm->master_hash)
307 		dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
308 
309 	if (dlm->name)
310 		kfree(dlm->name);
311 
312 	kfree(dlm);
313 }
314 
315 /* A little strange - this function will be called while holding
316  * dlm_domain_lock and is expected to be holding it on the way out. We
317  * will however drop and reacquire it multiple times */
318 static void dlm_ctxt_release(struct kref *kref)
319 {
320 	struct dlm_ctxt *dlm;
321 
322 	dlm = container_of(kref, struct dlm_ctxt, dlm_refs);
323 
324 	BUG_ON(dlm->num_joins);
325 	BUG_ON(dlm->dlm_state == DLM_CTXT_JOINED);
326 
327 	/* we may still be in the list if we hit an error during join. */
328 	list_del_init(&dlm->list);
329 
330 	spin_unlock(&dlm_domain_lock);
331 
332 	mlog(0, "freeing memory from domain %s\n", dlm->name);
333 
334 	wake_up(&dlm_domain_events);
335 
336 	dlm_free_ctxt_mem(dlm);
337 
338 	spin_lock(&dlm_domain_lock);
339 }
340 
341 void dlm_put(struct dlm_ctxt *dlm)
342 {
343 	spin_lock(&dlm_domain_lock);
344 	kref_put(&dlm->dlm_refs, dlm_ctxt_release);
345 	spin_unlock(&dlm_domain_lock);
346 }
347 
348 static void __dlm_get(struct dlm_ctxt *dlm)
349 {
350 	kref_get(&dlm->dlm_refs);
351 }
352 
353 /* given a questionable reference to a dlm object, gets a reference if
354  * it can find it in the list, otherwise returns NULL in which case
355  * you shouldn't trust your pointer. */
356 struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm)
357 {
358 	struct list_head *iter;
359 	struct dlm_ctxt *target = NULL;
360 
361 	spin_lock(&dlm_domain_lock);
362 
363 	list_for_each(iter, &dlm_domains) {
364 		target = list_entry (iter, struct dlm_ctxt, list);
365 
366 		if (target == dlm) {
367 			__dlm_get(target);
368 			break;
369 		}
370 
371 		target = NULL;
372 	}
373 
374 	spin_unlock(&dlm_domain_lock);
375 
376 	return target;
377 }
378 
379 int dlm_domain_fully_joined(struct dlm_ctxt *dlm)
380 {
381 	int ret;
382 
383 	spin_lock(&dlm_domain_lock);
384 	ret = (dlm->dlm_state == DLM_CTXT_JOINED) ||
385 		(dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN);
386 	spin_unlock(&dlm_domain_lock);
387 
388 	return ret;
389 }
390 
391 static void dlm_destroy_dlm_worker(struct dlm_ctxt *dlm)
392 {
393 	if (dlm->dlm_worker) {
394 		flush_workqueue(dlm->dlm_worker);
395 		destroy_workqueue(dlm->dlm_worker);
396 		dlm->dlm_worker = NULL;
397 	}
398 }
399 
400 static void dlm_complete_dlm_shutdown(struct dlm_ctxt *dlm)
401 {
402 	dlm_unregister_domain_handlers(dlm);
403 	dlm_debug_shutdown(dlm);
404 	dlm_complete_thread(dlm);
405 	dlm_complete_recovery_thread(dlm);
406 	dlm_destroy_dlm_worker(dlm);
407 
408 	/* We've left the domain. Now we can take ourselves out of the
409 	 * list and allow the kref stuff to help us free the
410 	 * memory. */
411 	spin_lock(&dlm_domain_lock);
412 	list_del_init(&dlm->list);
413 	spin_unlock(&dlm_domain_lock);
414 
415 	/* Wake up anyone waiting for us to remove this domain */
416 	wake_up(&dlm_domain_events);
417 }
418 
419 static int dlm_migrate_all_locks(struct dlm_ctxt *dlm)
420 {
421 	int i, num, n, ret = 0;
422 	struct dlm_lock_resource *res;
423 	struct hlist_node *iter;
424 	struct hlist_head *bucket;
425 	int dropped;
426 
427 	mlog(0, "Migrating locks from domain %s\n", dlm->name);
428 
429 	num = 0;
430 	spin_lock(&dlm->spinlock);
431 	for (i = 0; i < DLM_HASH_BUCKETS; i++) {
432 redo_bucket:
433 		n = 0;
434 		bucket = dlm_lockres_hash(dlm, i);
435 		iter = bucket->first;
436 		while (iter) {
437 			n++;
438 			res = hlist_entry(iter, struct dlm_lock_resource,
439 					  hash_node);
440 			dlm_lockres_get(res);
441 			/* migrate, if necessary.  this will drop the dlm
442 			 * spinlock and retake it if it does migration. */
443 			dropped = dlm_empty_lockres(dlm, res);
444 
445 			spin_lock(&res->spinlock);
446 			__dlm_lockres_calc_usage(dlm, res);
447 			iter = res->hash_node.next;
448 			spin_unlock(&res->spinlock);
449 
450 			dlm_lockres_put(res);
451 
452 			if (dropped)
453 				goto redo_bucket;
454 		}
455 		cond_resched_lock(&dlm->spinlock);
456 		num += n;
457 		mlog(0, "%s: touched %d lockreses in bucket %d "
458 		     "(tot=%d)\n", dlm->name, n, i, num);
459 	}
460 	spin_unlock(&dlm->spinlock);
461 	wake_up(&dlm->dlm_thread_wq);
462 
463 	/* let the dlm thread take care of purging, keep scanning until
464 	 * nothing remains in the hash */
465 	if (num) {
466 		mlog(0, "%s: %d lock resources in hash last pass\n",
467 		     dlm->name, num);
468 		ret = -EAGAIN;
469 	}
470 	mlog(0, "DONE Migrating locks from domain %s\n", dlm->name);
471 	return ret;
472 }
473 
474 static int dlm_no_joining_node(struct dlm_ctxt *dlm)
475 {
476 	int ret;
477 
478 	spin_lock(&dlm->spinlock);
479 	ret = dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN;
480 	spin_unlock(&dlm->spinlock);
481 
482 	return ret;
483 }
484 
485 static void dlm_mark_domain_leaving(struct dlm_ctxt *dlm)
486 {
487 	/* Yikes, a double spinlock! I need domain_lock for the dlm
488 	 * state and the dlm spinlock for join state... Sorry! */
489 again:
490 	spin_lock(&dlm_domain_lock);
491 	spin_lock(&dlm->spinlock);
492 
493 	if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
494 		mlog(0, "Node %d is joining, we wait on it.\n",
495 			  dlm->joining_node);
496 		spin_unlock(&dlm->spinlock);
497 		spin_unlock(&dlm_domain_lock);
498 
499 		wait_event(dlm->dlm_join_events, dlm_no_joining_node(dlm));
500 		goto again;
501 	}
502 
503 	dlm->dlm_state = DLM_CTXT_LEAVING;
504 	spin_unlock(&dlm->spinlock);
505 	spin_unlock(&dlm_domain_lock);
506 }
507 
508 static void __dlm_print_nodes(struct dlm_ctxt *dlm)
509 {
510 	int node = -1;
511 
512 	assert_spin_locked(&dlm->spinlock);
513 
514 	printk(KERN_NOTICE "o2dlm: Nodes in domain %s: ", dlm->name);
515 
516 	while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
517 				     node + 1)) < O2NM_MAX_NODES) {
518 		printk("%d ", node);
519 	}
520 	printk("\n");
521 }
522 
523 static int dlm_exit_domain_handler(struct o2net_msg *msg, u32 len, void *data,
524 				   void **ret_data)
525 {
526 	struct dlm_ctxt *dlm = data;
527 	unsigned int node;
528 	struct dlm_exit_domain *exit_msg = (struct dlm_exit_domain *) msg->buf;
529 
530 	mlog_entry("%p %u %p", msg, len, data);
531 
532 	if (!dlm_grab(dlm))
533 		return 0;
534 
535 	node = exit_msg->node_idx;
536 
537 	printk(KERN_NOTICE "o2dlm: Node %u leaves domain %s\n", node, dlm->name);
538 
539 	spin_lock(&dlm->spinlock);
540 	clear_bit(node, dlm->domain_map);
541 	__dlm_print_nodes(dlm);
542 
543 	/* notify anything attached to the heartbeat events */
544 	dlm_hb_event_notify_attached(dlm, node, 0);
545 
546 	spin_unlock(&dlm->spinlock);
547 
548 	dlm_put(dlm);
549 
550 	return 0;
551 }
552 
553 static int dlm_send_one_domain_exit(struct dlm_ctxt *dlm,
554 				    unsigned int node)
555 {
556 	int status;
557 	struct dlm_exit_domain leave_msg;
558 
559 	mlog(0, "Asking node %u if we can leave the domain %s me = %u\n",
560 		  node, dlm->name, dlm->node_num);
561 
562 	memset(&leave_msg, 0, sizeof(leave_msg));
563 	leave_msg.node_idx = dlm->node_num;
564 
565 	status = o2net_send_message(DLM_EXIT_DOMAIN_MSG, dlm->key,
566 				    &leave_msg, sizeof(leave_msg), node,
567 				    NULL);
568 	if (status < 0)
569 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
570 		     "node %u\n", status, DLM_EXIT_DOMAIN_MSG, dlm->key, node);
571 	mlog(0, "status return %d from o2net_send_message\n", status);
572 
573 	return status;
574 }
575 
576 
577 static void dlm_leave_domain(struct dlm_ctxt *dlm)
578 {
579 	int node, clear_node, status;
580 
581 	/* At this point we've migrated away all our locks and won't
582 	 * accept mastership of new ones. The dlm is responsible for
583 	 * almost nothing now. We make sure not to confuse any joining
584 	 * nodes and then commence shutdown procedure. */
585 
586 	spin_lock(&dlm->spinlock);
587 	/* Clear ourselves from the domain map */
588 	clear_bit(dlm->node_num, dlm->domain_map);
589 	while ((node = find_next_bit(dlm->domain_map, O2NM_MAX_NODES,
590 				     0)) < O2NM_MAX_NODES) {
591 		/* Drop the dlm spinlock. This is safe wrt the domain_map.
592 		 * -nodes cannot be added now as the
593 		 *   query_join_handlers knows to respond with OK_NO_MAP
594 		 * -we catch the right network errors if a node is
595 		 *   removed from the map while we're sending him the
596 		 *   exit message. */
597 		spin_unlock(&dlm->spinlock);
598 
599 		clear_node = 1;
600 
601 		status = dlm_send_one_domain_exit(dlm, node);
602 		if (status < 0 &&
603 		    status != -ENOPROTOOPT &&
604 		    status != -ENOTCONN) {
605 			mlog(ML_NOTICE, "Error %d sending domain exit message "
606 			     "to node %d\n", status, node);
607 
608 			/* Not sure what to do here but lets sleep for
609 			 * a bit in case this was a transient
610 			 * error... */
611 			msleep(DLM_DOMAIN_BACKOFF_MS);
612 			clear_node = 0;
613 		}
614 
615 		spin_lock(&dlm->spinlock);
616 		/* If we're not clearing the node bit then we intend
617 		 * to loop back around to try again. */
618 		if (clear_node)
619 			clear_bit(node, dlm->domain_map);
620 	}
621 	spin_unlock(&dlm->spinlock);
622 }
623 
624 int dlm_joined(struct dlm_ctxt *dlm)
625 {
626 	int ret = 0;
627 
628 	spin_lock(&dlm_domain_lock);
629 
630 	if (dlm->dlm_state == DLM_CTXT_JOINED)
631 		ret = 1;
632 
633 	spin_unlock(&dlm_domain_lock);
634 
635 	return ret;
636 }
637 
638 int dlm_shutting_down(struct dlm_ctxt *dlm)
639 {
640 	int ret = 0;
641 
642 	spin_lock(&dlm_domain_lock);
643 
644 	if (dlm->dlm_state == DLM_CTXT_IN_SHUTDOWN)
645 		ret = 1;
646 
647 	spin_unlock(&dlm_domain_lock);
648 
649 	return ret;
650 }
651 
652 void dlm_unregister_domain(struct dlm_ctxt *dlm)
653 {
654 	int leave = 0;
655 	struct dlm_lock_resource *res;
656 
657 	spin_lock(&dlm_domain_lock);
658 	BUG_ON(dlm->dlm_state != DLM_CTXT_JOINED);
659 	BUG_ON(!dlm->num_joins);
660 
661 	dlm->num_joins--;
662 	if (!dlm->num_joins) {
663 		/* We mark it "in shutdown" now so new register
664 		 * requests wait until we've completely left the
665 		 * domain. Don't use DLM_CTXT_LEAVING yet as we still
666 		 * want new domain joins to communicate with us at
667 		 * least until we've completed migration of our
668 		 * resources. */
669 		dlm->dlm_state = DLM_CTXT_IN_SHUTDOWN;
670 		leave = 1;
671 	}
672 	spin_unlock(&dlm_domain_lock);
673 
674 	if (leave) {
675 		mlog(0, "shutting down domain %s\n", dlm->name);
676 
677 		/* We changed dlm state, notify the thread */
678 		dlm_kick_thread(dlm, NULL);
679 
680 		while (dlm_migrate_all_locks(dlm)) {
681 			/* Give dlm_thread time to purge the lockres' */
682 			msleep(500);
683 			mlog(0, "%s: more migration to do\n", dlm->name);
684 		}
685 
686 		/* This list should be empty. If not, print remaining lockres */
687 		if (!list_empty(&dlm->tracking_list)) {
688 			mlog(ML_ERROR, "Following lockres' are still on the "
689 			     "tracking list:\n");
690 			list_for_each_entry(res, &dlm->tracking_list, tracking)
691 				dlm_print_one_lock_resource(res);
692 		}
693 
694 		dlm_mark_domain_leaving(dlm);
695 		dlm_leave_domain(dlm);
696 		dlm_complete_dlm_shutdown(dlm);
697 	}
698 	dlm_put(dlm);
699 }
700 EXPORT_SYMBOL_GPL(dlm_unregister_domain);
701 
702 static int dlm_query_join_proto_check(char *proto_type, int node,
703 				      struct dlm_protocol_version *ours,
704 				      struct dlm_protocol_version *request)
705 {
706 	int rc;
707 	struct dlm_protocol_version proto = *request;
708 
709 	if (!dlm_protocol_compare(ours, &proto)) {
710 		mlog(0,
711 		     "node %u wanted to join with %s locking protocol "
712 		     "%u.%u, we respond with %u.%u\n",
713 		     node, proto_type,
714 		     request->pv_major,
715 		     request->pv_minor,
716 		     proto.pv_major, proto.pv_minor);
717 		request->pv_minor = proto.pv_minor;
718 		rc = 0;
719 	} else {
720 		mlog(ML_NOTICE,
721 		     "Node %u wanted to join with %s locking "
722 		     "protocol %u.%u, but we have %u.%u, disallowing\n",
723 		     node, proto_type,
724 		     request->pv_major,
725 		     request->pv_minor,
726 		     ours->pv_major,
727 		     ours->pv_minor);
728 		rc = 1;
729 	}
730 
731 	return rc;
732 }
733 
734 /*
735  * struct dlm_query_join_packet is made up of four one-byte fields.  They
736  * are effectively in big-endian order already.  However, little-endian
737  * machines swap them before putting the packet on the wire (because
738  * query_join's response is a status, and that status is treated as a u32
739  * on the wire).  Thus, a big-endian and little-endian machines will treat
740  * this structure differently.
741  *
742  * The solution is to have little-endian machines swap the structure when
743  * converting from the structure to the u32 representation.  This will
744  * result in the structure having the correct format on the wire no matter
745  * the host endian format.
746  */
747 static void dlm_query_join_packet_to_wire(struct dlm_query_join_packet *packet,
748 					  u32 *wire)
749 {
750 	union dlm_query_join_response response;
751 
752 	response.packet = *packet;
753 	*wire = cpu_to_be32(response.intval);
754 }
755 
756 static void dlm_query_join_wire_to_packet(u32 wire,
757 					  struct dlm_query_join_packet *packet)
758 {
759 	union dlm_query_join_response response;
760 
761 	response.intval = cpu_to_be32(wire);
762 	*packet = response.packet;
763 }
764 
765 static int dlm_query_join_handler(struct o2net_msg *msg, u32 len, void *data,
766 				  void **ret_data)
767 {
768 	struct dlm_query_join_request *query;
769 	struct dlm_query_join_packet packet = {
770 		.code = JOIN_DISALLOW,
771 	};
772 	struct dlm_ctxt *dlm = NULL;
773 	u32 response;
774 	u8 nodenum;
775 
776 	query = (struct dlm_query_join_request *) msg->buf;
777 
778 	mlog(0, "node %u wants to join domain %s\n", query->node_idx,
779 		  query->domain);
780 
781 	/*
782 	 * If heartbeat doesn't consider the node live, tell it
783 	 * to back off and try again.  This gives heartbeat a chance
784 	 * to catch up.
785 	 */
786 	if (!o2hb_check_node_heartbeating(query->node_idx)) {
787 		mlog(0, "node %u is not in our live map yet\n",
788 		     query->node_idx);
789 
790 		packet.code = JOIN_DISALLOW;
791 		goto respond;
792 	}
793 
794 	packet.code = JOIN_OK_NO_MAP;
795 
796 	spin_lock(&dlm_domain_lock);
797 	dlm = __dlm_lookup_domain_full(query->domain, query->name_len);
798 	if (!dlm)
799 		goto unlock_respond;
800 
801 	/*
802 	 * There is a small window where the joining node may not see the
803 	 * node(s) that just left but still part of the cluster. DISALLOW
804 	 * join request if joining node has different node map.
805 	 */
806 	nodenum=0;
807 	while (nodenum < O2NM_MAX_NODES) {
808 		if (test_bit(nodenum, dlm->domain_map)) {
809 			if (!byte_test_bit(nodenum, query->node_map)) {
810 				mlog(0, "disallow join as node %u does not "
811 				     "have node %u in its nodemap\n",
812 				     query->node_idx, nodenum);
813 				packet.code = JOIN_DISALLOW;
814 				goto unlock_respond;
815 			}
816 		}
817 		nodenum++;
818 	}
819 
820 	/* Once the dlm ctxt is marked as leaving then we don't want
821 	 * to be put in someone's domain map.
822 	 * Also, explicitly disallow joining at certain troublesome
823 	 * times (ie. during recovery). */
824 	if (dlm && dlm->dlm_state != DLM_CTXT_LEAVING) {
825 		int bit = query->node_idx;
826 		spin_lock(&dlm->spinlock);
827 
828 		if (dlm->dlm_state == DLM_CTXT_NEW &&
829 		    dlm->joining_node == DLM_LOCK_RES_OWNER_UNKNOWN) {
830 			/*If this is a brand new context and we
831 			 * haven't started our join process yet, then
832 			 * the other node won the race. */
833 			packet.code = JOIN_OK_NO_MAP;
834 		} else if (dlm->joining_node != DLM_LOCK_RES_OWNER_UNKNOWN) {
835 			/* Disallow parallel joins. */
836 			packet.code = JOIN_DISALLOW;
837 		} else if (dlm->reco.state & DLM_RECO_STATE_ACTIVE) {
838 			mlog(0, "node %u trying to join, but recovery "
839 			     "is ongoing.\n", bit);
840 			packet.code = JOIN_DISALLOW;
841 		} else if (test_bit(bit, dlm->recovery_map)) {
842 			mlog(0, "node %u trying to join, but it "
843 			     "still needs recovery.\n", bit);
844 			packet.code = JOIN_DISALLOW;
845 		} else if (test_bit(bit, dlm->domain_map)) {
846 			mlog(0, "node %u trying to join, but it "
847 			     "is still in the domain! needs recovery?\n",
848 			     bit);
849 			packet.code = JOIN_DISALLOW;
850 		} else {
851 			/* Alright we're fully a part of this domain
852 			 * so we keep some state as to who's joining
853 			 * and indicate to him that needs to be fixed
854 			 * up. */
855 
856 			/* Make sure we speak compatible locking protocols.  */
857 			if (dlm_query_join_proto_check("DLM", bit,
858 						       &dlm->dlm_locking_proto,
859 						       &query->dlm_proto)) {
860 				packet.code = JOIN_PROTOCOL_MISMATCH;
861 			} else if (dlm_query_join_proto_check("fs", bit,
862 							      &dlm->fs_locking_proto,
863 							      &query->fs_proto)) {
864 				packet.code = JOIN_PROTOCOL_MISMATCH;
865 			} else {
866 				packet.dlm_minor = query->dlm_proto.pv_minor;
867 				packet.fs_minor = query->fs_proto.pv_minor;
868 				packet.code = JOIN_OK;
869 				__dlm_set_joining_node(dlm, query->node_idx);
870 			}
871 		}
872 
873 		spin_unlock(&dlm->spinlock);
874 	}
875 unlock_respond:
876 	spin_unlock(&dlm_domain_lock);
877 
878 respond:
879 	mlog(0, "We respond with %u\n", packet.code);
880 
881 	dlm_query_join_packet_to_wire(&packet, &response);
882 	return response;
883 }
884 
885 static int dlm_assert_joined_handler(struct o2net_msg *msg, u32 len, void *data,
886 				     void **ret_data)
887 {
888 	struct dlm_assert_joined *assert;
889 	struct dlm_ctxt *dlm = NULL;
890 
891 	assert = (struct dlm_assert_joined *) msg->buf;
892 
893 	mlog(0, "node %u asserts join on domain %s\n", assert->node_idx,
894 		  assert->domain);
895 
896 	spin_lock(&dlm_domain_lock);
897 	dlm = __dlm_lookup_domain_full(assert->domain, assert->name_len);
898 	/* XXX should we consider no dlm ctxt an error? */
899 	if (dlm) {
900 		spin_lock(&dlm->spinlock);
901 
902 		/* Alright, this node has officially joined our
903 		 * domain. Set him in the map and clean up our
904 		 * leftover join state. */
905 		BUG_ON(dlm->joining_node != assert->node_idx);
906 		set_bit(assert->node_idx, dlm->domain_map);
907 		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
908 
909 		printk(KERN_NOTICE "o2dlm: Node %u joins domain %s\n",
910 		       assert->node_idx, dlm->name);
911 		__dlm_print_nodes(dlm);
912 
913 		/* notify anything attached to the heartbeat events */
914 		dlm_hb_event_notify_attached(dlm, assert->node_idx, 1);
915 
916 		spin_unlock(&dlm->spinlock);
917 	}
918 	spin_unlock(&dlm_domain_lock);
919 
920 	return 0;
921 }
922 
923 static int dlm_cancel_join_handler(struct o2net_msg *msg, u32 len, void *data,
924 				   void **ret_data)
925 {
926 	struct dlm_cancel_join *cancel;
927 	struct dlm_ctxt *dlm = NULL;
928 
929 	cancel = (struct dlm_cancel_join *) msg->buf;
930 
931 	mlog(0, "node %u cancels join on domain %s\n", cancel->node_idx,
932 		  cancel->domain);
933 
934 	spin_lock(&dlm_domain_lock);
935 	dlm = __dlm_lookup_domain_full(cancel->domain, cancel->name_len);
936 
937 	if (dlm) {
938 		spin_lock(&dlm->spinlock);
939 
940 		/* Yikes, this guy wants to cancel his join. No
941 		 * problem, we simply cleanup our join state. */
942 		BUG_ON(dlm->joining_node != cancel->node_idx);
943 		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
944 
945 		spin_unlock(&dlm->spinlock);
946 	}
947 	spin_unlock(&dlm_domain_lock);
948 
949 	return 0;
950 }
951 
952 static int dlm_send_one_join_cancel(struct dlm_ctxt *dlm,
953 				    unsigned int node)
954 {
955 	int status;
956 	struct dlm_cancel_join cancel_msg;
957 
958 	memset(&cancel_msg, 0, sizeof(cancel_msg));
959 	cancel_msg.node_idx = dlm->node_num;
960 	cancel_msg.name_len = strlen(dlm->name);
961 	memcpy(cancel_msg.domain, dlm->name, cancel_msg.name_len);
962 
963 	status = o2net_send_message(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
964 				    &cancel_msg, sizeof(cancel_msg), node,
965 				    NULL);
966 	if (status < 0) {
967 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
968 		     "node %u\n", status, DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
969 		     node);
970 		goto bail;
971 	}
972 
973 bail:
974 	return status;
975 }
976 
977 /* map_size should be in bytes. */
978 static int dlm_send_join_cancels(struct dlm_ctxt *dlm,
979 				 unsigned long *node_map,
980 				 unsigned int map_size)
981 {
982 	int status, tmpstat;
983 	unsigned int node;
984 
985 	if (map_size != (BITS_TO_LONGS(O2NM_MAX_NODES) *
986 			 sizeof(unsigned long))) {
987 		mlog(ML_ERROR,
988 		     "map_size %u != BITS_TO_LONGS(O2NM_MAX_NODES) %u\n",
989 		     map_size, (unsigned)BITS_TO_LONGS(O2NM_MAX_NODES));
990 		return -EINVAL;
991 	}
992 
993 	status = 0;
994 	node = -1;
995 	while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
996 				     node + 1)) < O2NM_MAX_NODES) {
997 		if (node == dlm->node_num)
998 			continue;
999 
1000 		tmpstat = dlm_send_one_join_cancel(dlm, node);
1001 		if (tmpstat) {
1002 			mlog(ML_ERROR, "Error return %d cancelling join on "
1003 			     "node %d\n", tmpstat, node);
1004 			if (!status)
1005 				status = tmpstat;
1006 		}
1007 	}
1008 
1009 	if (status)
1010 		mlog_errno(status);
1011 	return status;
1012 }
1013 
1014 static int dlm_request_join(struct dlm_ctxt *dlm,
1015 			    int node,
1016 			    enum dlm_query_join_response_code *response)
1017 {
1018 	int status;
1019 	struct dlm_query_join_request join_msg;
1020 	struct dlm_query_join_packet packet;
1021 	u32 join_resp;
1022 
1023 	mlog(0, "querying node %d\n", node);
1024 
1025 	memset(&join_msg, 0, sizeof(join_msg));
1026 	join_msg.node_idx = dlm->node_num;
1027 	join_msg.name_len = strlen(dlm->name);
1028 	memcpy(join_msg.domain, dlm->name, join_msg.name_len);
1029 	join_msg.dlm_proto = dlm->dlm_locking_proto;
1030 	join_msg.fs_proto = dlm->fs_locking_proto;
1031 
1032 	/* copy live node map to join message */
1033 	byte_copymap(join_msg.node_map, dlm->live_nodes_map, O2NM_MAX_NODES);
1034 
1035 	status = o2net_send_message(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY, &join_msg,
1036 				    sizeof(join_msg), node, &join_resp);
1037 	if (status < 0 && status != -ENOPROTOOPT) {
1038 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1039 		     "node %u\n", status, DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1040 		     node);
1041 		goto bail;
1042 	}
1043 	dlm_query_join_wire_to_packet(join_resp, &packet);
1044 
1045 	/* -ENOPROTOOPT from the net code means the other side isn't
1046 	    listening for our message type -- that's fine, it means
1047 	    his dlm isn't up, so we can consider him a 'yes' but not
1048 	    joined into the domain.  */
1049 	if (status == -ENOPROTOOPT) {
1050 		status = 0;
1051 		*response = JOIN_OK_NO_MAP;
1052 	} else if (packet.code == JOIN_DISALLOW ||
1053 		   packet.code == JOIN_OK_NO_MAP) {
1054 		*response = packet.code;
1055 	} else if (packet.code == JOIN_PROTOCOL_MISMATCH) {
1056 		mlog(ML_NOTICE,
1057 		     "This node requested DLM locking protocol %u.%u and "
1058 		     "filesystem locking protocol %u.%u.  At least one of "
1059 		     "the protocol versions on node %d is not compatible, "
1060 		     "disconnecting\n",
1061 		     dlm->dlm_locking_proto.pv_major,
1062 		     dlm->dlm_locking_proto.pv_minor,
1063 		     dlm->fs_locking_proto.pv_major,
1064 		     dlm->fs_locking_proto.pv_minor,
1065 		     node);
1066 		status = -EPROTO;
1067 		*response = packet.code;
1068 	} else if (packet.code == JOIN_OK) {
1069 		*response = packet.code;
1070 		/* Use the same locking protocol as the remote node */
1071 		dlm->dlm_locking_proto.pv_minor = packet.dlm_minor;
1072 		dlm->fs_locking_proto.pv_minor = packet.fs_minor;
1073 		mlog(0,
1074 		     "Node %d responds JOIN_OK with DLM locking protocol "
1075 		     "%u.%u and fs locking protocol %u.%u\n",
1076 		     node,
1077 		     dlm->dlm_locking_proto.pv_major,
1078 		     dlm->dlm_locking_proto.pv_minor,
1079 		     dlm->fs_locking_proto.pv_major,
1080 		     dlm->fs_locking_proto.pv_minor);
1081 	} else {
1082 		status = -EINVAL;
1083 		mlog(ML_ERROR, "invalid response %d from node %u\n",
1084 		     packet.code, node);
1085 	}
1086 
1087 	mlog(0, "status %d, node %d response is %d\n", status, node,
1088 	     *response);
1089 
1090 bail:
1091 	return status;
1092 }
1093 
1094 static int dlm_send_one_join_assert(struct dlm_ctxt *dlm,
1095 				    unsigned int node)
1096 {
1097 	int status;
1098 	struct dlm_assert_joined assert_msg;
1099 
1100 	mlog(0, "Sending join assert to node %u\n", node);
1101 
1102 	memset(&assert_msg, 0, sizeof(assert_msg));
1103 	assert_msg.node_idx = dlm->node_num;
1104 	assert_msg.name_len = strlen(dlm->name);
1105 	memcpy(assert_msg.domain, dlm->name, assert_msg.name_len);
1106 
1107 	status = o2net_send_message(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1108 				    &assert_msg, sizeof(assert_msg), node,
1109 				    NULL);
1110 	if (status < 0)
1111 		mlog(ML_ERROR, "Error %d when sending message %u (key 0x%x) to "
1112 		     "node %u\n", status, DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1113 		     node);
1114 
1115 	return status;
1116 }
1117 
1118 static void dlm_send_join_asserts(struct dlm_ctxt *dlm,
1119 				  unsigned long *node_map)
1120 {
1121 	int status, node, live;
1122 
1123 	status = 0;
1124 	node = -1;
1125 	while ((node = find_next_bit(node_map, O2NM_MAX_NODES,
1126 				     node + 1)) < O2NM_MAX_NODES) {
1127 		if (node == dlm->node_num)
1128 			continue;
1129 
1130 		do {
1131 			/* It is very important that this message be
1132 			 * received so we spin until either the node
1133 			 * has died or it gets the message. */
1134 			status = dlm_send_one_join_assert(dlm, node);
1135 
1136 			spin_lock(&dlm->spinlock);
1137 			live = test_bit(node, dlm->live_nodes_map);
1138 			spin_unlock(&dlm->spinlock);
1139 
1140 			if (status) {
1141 				mlog(ML_ERROR, "Error return %d asserting "
1142 				     "join on node %d\n", status, node);
1143 
1144 				/* give us some time between errors... */
1145 				if (live)
1146 					msleep(DLM_DOMAIN_BACKOFF_MS);
1147 			}
1148 		} while (status && live);
1149 	}
1150 }
1151 
1152 struct domain_join_ctxt {
1153 	unsigned long live_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1154 	unsigned long yes_resp_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1155 };
1156 
1157 static int dlm_should_restart_join(struct dlm_ctxt *dlm,
1158 				   struct domain_join_ctxt *ctxt,
1159 				   enum dlm_query_join_response_code response)
1160 {
1161 	int ret;
1162 
1163 	if (response == JOIN_DISALLOW) {
1164 		mlog(0, "Latest response of disallow -- should restart\n");
1165 		return 1;
1166 	}
1167 
1168 	spin_lock(&dlm->spinlock);
1169 	/* For now, we restart the process if the node maps have
1170 	 * changed at all */
1171 	ret = memcmp(ctxt->live_map, dlm->live_nodes_map,
1172 		     sizeof(dlm->live_nodes_map));
1173 	spin_unlock(&dlm->spinlock);
1174 
1175 	if (ret)
1176 		mlog(0, "Node maps changed -- should restart\n");
1177 
1178 	return ret;
1179 }
1180 
1181 static int dlm_try_to_join_domain(struct dlm_ctxt *dlm)
1182 {
1183 	int status = 0, tmpstat, node;
1184 	struct domain_join_ctxt *ctxt;
1185 	enum dlm_query_join_response_code response = JOIN_DISALLOW;
1186 
1187 	mlog_entry("%p", dlm);
1188 
1189 	ctxt = kzalloc(sizeof(*ctxt), GFP_KERNEL);
1190 	if (!ctxt) {
1191 		status = -ENOMEM;
1192 		mlog_errno(status);
1193 		goto bail;
1194 	}
1195 
1196 	/* group sem locking should work for us here -- we're already
1197 	 * registered for heartbeat events so filling this should be
1198 	 * atomic wrt getting those handlers called. */
1199 	o2hb_fill_node_map(dlm->live_nodes_map, sizeof(dlm->live_nodes_map));
1200 
1201 	spin_lock(&dlm->spinlock);
1202 	memcpy(ctxt->live_map, dlm->live_nodes_map, sizeof(ctxt->live_map));
1203 
1204 	__dlm_set_joining_node(dlm, dlm->node_num);
1205 
1206 	spin_unlock(&dlm->spinlock);
1207 
1208 	node = -1;
1209 	while ((node = find_next_bit(ctxt->live_map, O2NM_MAX_NODES,
1210 				     node + 1)) < O2NM_MAX_NODES) {
1211 		if (node == dlm->node_num)
1212 			continue;
1213 
1214 		status = dlm_request_join(dlm, node, &response);
1215 		if (status < 0) {
1216 			mlog_errno(status);
1217 			goto bail;
1218 		}
1219 
1220 		/* Ok, either we got a response or the node doesn't have a
1221 		 * dlm up. */
1222 		if (response == JOIN_OK)
1223 			set_bit(node, ctxt->yes_resp_map);
1224 
1225 		if (dlm_should_restart_join(dlm, ctxt, response)) {
1226 			status = -EAGAIN;
1227 			goto bail;
1228 		}
1229 	}
1230 
1231 	mlog(0, "Yay, done querying nodes!\n");
1232 
1233 	/* Yay, everyone agree's we can join the domain. My domain is
1234 	 * comprised of all nodes who were put in the
1235 	 * yes_resp_map. Copy that into our domain map and send a join
1236 	 * assert message to clean up everyone elses state. */
1237 	spin_lock(&dlm->spinlock);
1238 	memcpy(dlm->domain_map, ctxt->yes_resp_map,
1239 	       sizeof(ctxt->yes_resp_map));
1240 	set_bit(dlm->node_num, dlm->domain_map);
1241 	spin_unlock(&dlm->spinlock);
1242 
1243 	dlm_send_join_asserts(dlm, ctxt->yes_resp_map);
1244 
1245 	/* Joined state *must* be set before the joining node
1246 	 * information, otherwise the query_join handler may read no
1247 	 * current joiner but a state of NEW and tell joining nodes
1248 	 * we're not in the domain. */
1249 	spin_lock(&dlm_domain_lock);
1250 	dlm->dlm_state = DLM_CTXT_JOINED;
1251 	dlm->num_joins++;
1252 	spin_unlock(&dlm_domain_lock);
1253 
1254 bail:
1255 	spin_lock(&dlm->spinlock);
1256 	__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);
1257 	if (!status)
1258 		__dlm_print_nodes(dlm);
1259 	spin_unlock(&dlm->spinlock);
1260 
1261 	if (ctxt) {
1262 		/* Do we need to send a cancel message to any nodes? */
1263 		if (status < 0) {
1264 			tmpstat = dlm_send_join_cancels(dlm,
1265 							ctxt->yes_resp_map,
1266 							sizeof(ctxt->yes_resp_map));
1267 			if (tmpstat < 0)
1268 				mlog_errno(tmpstat);
1269 		}
1270 		kfree(ctxt);
1271 	}
1272 
1273 	mlog(0, "returning %d\n", status);
1274 	return status;
1275 }
1276 
1277 static void dlm_unregister_domain_handlers(struct dlm_ctxt *dlm)
1278 {
1279 	o2hb_unregister_callback(NULL, &dlm->dlm_hb_up);
1280 	o2hb_unregister_callback(NULL, &dlm->dlm_hb_down);
1281 	o2net_unregister_handler_list(&dlm->dlm_domain_handlers);
1282 }
1283 
1284 static int dlm_register_domain_handlers(struct dlm_ctxt *dlm)
1285 {
1286 	int status;
1287 
1288 	mlog(0, "registering handlers.\n");
1289 
1290 	o2hb_setup_callback(&dlm->dlm_hb_down, O2HB_NODE_DOWN_CB,
1291 			    dlm_hb_node_down_cb, dlm, DLM_HB_NODE_DOWN_PRI);
1292 	status = o2hb_register_callback(NULL, &dlm->dlm_hb_down);
1293 	if (status)
1294 		goto bail;
1295 
1296 	o2hb_setup_callback(&dlm->dlm_hb_up, O2HB_NODE_UP_CB,
1297 			    dlm_hb_node_up_cb, dlm, DLM_HB_NODE_UP_PRI);
1298 	status = o2hb_register_callback(NULL, &dlm->dlm_hb_up);
1299 	if (status)
1300 		goto bail;
1301 
1302 	status = o2net_register_handler(DLM_MASTER_REQUEST_MSG, dlm->key,
1303 					sizeof(struct dlm_master_request),
1304 					dlm_master_request_handler,
1305 					dlm, NULL, &dlm->dlm_domain_handlers);
1306 	if (status)
1307 		goto bail;
1308 
1309 	status = o2net_register_handler(DLM_ASSERT_MASTER_MSG, dlm->key,
1310 					sizeof(struct dlm_assert_master),
1311 					dlm_assert_master_handler,
1312 					dlm, dlm_assert_master_post_handler,
1313 					&dlm->dlm_domain_handlers);
1314 	if (status)
1315 		goto bail;
1316 
1317 	status = o2net_register_handler(DLM_CREATE_LOCK_MSG, dlm->key,
1318 					sizeof(struct dlm_create_lock),
1319 					dlm_create_lock_handler,
1320 					dlm, NULL, &dlm->dlm_domain_handlers);
1321 	if (status)
1322 		goto bail;
1323 
1324 	status = o2net_register_handler(DLM_CONVERT_LOCK_MSG, dlm->key,
1325 					DLM_CONVERT_LOCK_MAX_LEN,
1326 					dlm_convert_lock_handler,
1327 					dlm, NULL, &dlm->dlm_domain_handlers);
1328 	if (status)
1329 		goto bail;
1330 
1331 	status = o2net_register_handler(DLM_UNLOCK_LOCK_MSG, dlm->key,
1332 					DLM_UNLOCK_LOCK_MAX_LEN,
1333 					dlm_unlock_lock_handler,
1334 					dlm, NULL, &dlm->dlm_domain_handlers);
1335 	if (status)
1336 		goto bail;
1337 
1338 	status = o2net_register_handler(DLM_PROXY_AST_MSG, dlm->key,
1339 					DLM_PROXY_AST_MAX_LEN,
1340 					dlm_proxy_ast_handler,
1341 					dlm, NULL, &dlm->dlm_domain_handlers);
1342 	if (status)
1343 		goto bail;
1344 
1345 	status = o2net_register_handler(DLM_EXIT_DOMAIN_MSG, dlm->key,
1346 					sizeof(struct dlm_exit_domain),
1347 					dlm_exit_domain_handler,
1348 					dlm, NULL, &dlm->dlm_domain_handlers);
1349 	if (status)
1350 		goto bail;
1351 
1352 	status = o2net_register_handler(DLM_DEREF_LOCKRES_MSG, dlm->key,
1353 					sizeof(struct dlm_deref_lockres),
1354 					dlm_deref_lockres_handler,
1355 					dlm, NULL, &dlm->dlm_domain_handlers);
1356 	if (status)
1357 		goto bail;
1358 
1359 	status = o2net_register_handler(DLM_MIGRATE_REQUEST_MSG, dlm->key,
1360 					sizeof(struct dlm_migrate_request),
1361 					dlm_migrate_request_handler,
1362 					dlm, NULL, &dlm->dlm_domain_handlers);
1363 	if (status)
1364 		goto bail;
1365 
1366 	status = o2net_register_handler(DLM_MIG_LOCKRES_MSG, dlm->key,
1367 					DLM_MIG_LOCKRES_MAX_LEN,
1368 					dlm_mig_lockres_handler,
1369 					dlm, NULL, &dlm->dlm_domain_handlers);
1370 	if (status)
1371 		goto bail;
1372 
1373 	status = o2net_register_handler(DLM_MASTER_REQUERY_MSG, dlm->key,
1374 					sizeof(struct dlm_master_requery),
1375 					dlm_master_requery_handler,
1376 					dlm, NULL, &dlm->dlm_domain_handlers);
1377 	if (status)
1378 		goto bail;
1379 
1380 	status = o2net_register_handler(DLM_LOCK_REQUEST_MSG, dlm->key,
1381 					sizeof(struct dlm_lock_request),
1382 					dlm_request_all_locks_handler,
1383 					dlm, NULL, &dlm->dlm_domain_handlers);
1384 	if (status)
1385 		goto bail;
1386 
1387 	status = o2net_register_handler(DLM_RECO_DATA_DONE_MSG, dlm->key,
1388 					sizeof(struct dlm_reco_data_done),
1389 					dlm_reco_data_done_handler,
1390 					dlm, NULL, &dlm->dlm_domain_handlers);
1391 	if (status)
1392 		goto bail;
1393 
1394 	status = o2net_register_handler(DLM_BEGIN_RECO_MSG, dlm->key,
1395 					sizeof(struct dlm_begin_reco),
1396 					dlm_begin_reco_handler,
1397 					dlm, NULL, &dlm->dlm_domain_handlers);
1398 	if (status)
1399 		goto bail;
1400 
1401 	status = o2net_register_handler(DLM_FINALIZE_RECO_MSG, dlm->key,
1402 					sizeof(struct dlm_finalize_reco),
1403 					dlm_finalize_reco_handler,
1404 					dlm, NULL, &dlm->dlm_domain_handlers);
1405 	if (status)
1406 		goto bail;
1407 
1408 bail:
1409 	if (status)
1410 		dlm_unregister_domain_handlers(dlm);
1411 
1412 	return status;
1413 }
1414 
1415 static int dlm_join_domain(struct dlm_ctxt *dlm)
1416 {
1417 	int status;
1418 	unsigned int backoff;
1419 	unsigned int total_backoff = 0;
1420 
1421 	BUG_ON(!dlm);
1422 
1423 	mlog(0, "Join domain %s\n", dlm->name);
1424 
1425 	status = dlm_register_domain_handlers(dlm);
1426 	if (status) {
1427 		mlog_errno(status);
1428 		goto bail;
1429 	}
1430 
1431 	status = dlm_debug_init(dlm);
1432 	if (status < 0) {
1433 		mlog_errno(status);
1434 		goto bail;
1435 	}
1436 
1437 	status = dlm_launch_thread(dlm);
1438 	if (status < 0) {
1439 		mlog_errno(status);
1440 		goto bail;
1441 	}
1442 
1443 	status = dlm_launch_recovery_thread(dlm);
1444 	if (status < 0) {
1445 		mlog_errno(status);
1446 		goto bail;
1447 	}
1448 
1449 	dlm->dlm_worker = create_singlethread_workqueue("dlm_wq");
1450 	if (!dlm->dlm_worker) {
1451 		status = -ENOMEM;
1452 		mlog_errno(status);
1453 		goto bail;
1454 	}
1455 
1456 	do {
1457 		status = dlm_try_to_join_domain(dlm);
1458 
1459 		/* If we're racing another node to the join, then we
1460 		 * need to back off temporarily and let them
1461 		 * complete. */
1462 #define	DLM_JOIN_TIMEOUT_MSECS	90000
1463 		if (status == -EAGAIN) {
1464 			if (signal_pending(current)) {
1465 				status = -ERESTARTSYS;
1466 				goto bail;
1467 			}
1468 
1469 			if (total_backoff >
1470 			    msecs_to_jiffies(DLM_JOIN_TIMEOUT_MSECS)) {
1471 				status = -ERESTARTSYS;
1472 				mlog(ML_NOTICE, "Timed out joining dlm domain "
1473 				     "%s after %u msecs\n", dlm->name,
1474 				     jiffies_to_msecs(total_backoff));
1475 				goto bail;
1476 			}
1477 
1478 			/*
1479 			 * <chip> After you!
1480 			 * <dale> No, after you!
1481 			 * <chip> I insist!
1482 			 * <dale> But you first!
1483 			 * ...
1484 			 */
1485 			backoff = (unsigned int)(jiffies & 0x3);
1486 			backoff *= DLM_DOMAIN_BACKOFF_MS;
1487 			total_backoff += backoff;
1488 			mlog(0, "backoff %d\n", backoff);
1489 			msleep(backoff);
1490 		}
1491 	} while (status == -EAGAIN);
1492 
1493 	if (status < 0) {
1494 		mlog_errno(status);
1495 		goto bail;
1496 	}
1497 
1498 	status = 0;
1499 bail:
1500 	wake_up(&dlm_domain_events);
1501 
1502 	if (status) {
1503 		dlm_unregister_domain_handlers(dlm);
1504 		dlm_debug_shutdown(dlm);
1505 		dlm_complete_thread(dlm);
1506 		dlm_complete_recovery_thread(dlm);
1507 		dlm_destroy_dlm_worker(dlm);
1508 	}
1509 
1510 	return status;
1511 }
1512 
1513 static struct dlm_ctxt *dlm_alloc_ctxt(const char *domain,
1514 				u32 key)
1515 {
1516 	int i;
1517 	int ret;
1518 	struct dlm_ctxt *dlm = NULL;
1519 
1520 	dlm = kzalloc(sizeof(*dlm), GFP_KERNEL);
1521 	if (!dlm) {
1522 		mlog_errno(-ENOMEM);
1523 		goto leave;
1524 	}
1525 
1526 	dlm->name = kstrdup(domain, GFP_KERNEL);
1527 	if (dlm->name == NULL) {
1528 		mlog_errno(-ENOMEM);
1529 		kfree(dlm);
1530 		dlm = NULL;
1531 		goto leave;
1532 	}
1533 
1534 	dlm->lockres_hash = (struct hlist_head **)dlm_alloc_pagevec(DLM_HASH_PAGES);
1535 	if (!dlm->lockres_hash) {
1536 		mlog_errno(-ENOMEM);
1537 		kfree(dlm->name);
1538 		kfree(dlm);
1539 		dlm = NULL;
1540 		goto leave;
1541 	}
1542 
1543 	for (i = 0; i < DLM_HASH_BUCKETS; i++)
1544 		INIT_HLIST_HEAD(dlm_lockres_hash(dlm, i));
1545 
1546 	dlm->master_hash = (struct hlist_head **)
1547 				dlm_alloc_pagevec(DLM_HASH_PAGES);
1548 	if (!dlm->master_hash) {
1549 		mlog_errno(-ENOMEM);
1550 		dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1551 		kfree(dlm->name);
1552 		kfree(dlm);
1553 		dlm = NULL;
1554 		goto leave;
1555 	}
1556 
1557 	for (i = 0; i < DLM_HASH_BUCKETS; i++)
1558 		INIT_HLIST_HEAD(dlm_master_hash(dlm, i));
1559 
1560 	dlm->key = key;
1561 	dlm->node_num = o2nm_this_node();
1562 
1563 	ret = dlm_create_debugfs_subroot(dlm);
1564 	if (ret < 0) {
1565 		dlm_free_pagevec((void **)dlm->master_hash, DLM_HASH_PAGES);
1566 		dlm_free_pagevec((void **)dlm->lockres_hash, DLM_HASH_PAGES);
1567 		kfree(dlm->name);
1568 		kfree(dlm);
1569 		dlm = NULL;
1570 		goto leave;
1571 	}
1572 
1573 	spin_lock_init(&dlm->spinlock);
1574 	spin_lock_init(&dlm->master_lock);
1575 	spin_lock_init(&dlm->ast_lock);
1576 	spin_lock_init(&dlm->track_lock);
1577 	INIT_LIST_HEAD(&dlm->list);
1578 	INIT_LIST_HEAD(&dlm->dirty_list);
1579 	INIT_LIST_HEAD(&dlm->reco.resources);
1580 	INIT_LIST_HEAD(&dlm->reco.received);
1581 	INIT_LIST_HEAD(&dlm->reco.node_data);
1582 	INIT_LIST_HEAD(&dlm->purge_list);
1583 	INIT_LIST_HEAD(&dlm->dlm_domain_handlers);
1584 	INIT_LIST_HEAD(&dlm->tracking_list);
1585 	dlm->reco.state = 0;
1586 
1587 	INIT_LIST_HEAD(&dlm->pending_asts);
1588 	INIT_LIST_HEAD(&dlm->pending_basts);
1589 
1590 	mlog(0, "dlm->recovery_map=%p, &(dlm->recovery_map[0])=%p\n",
1591 		  dlm->recovery_map, &(dlm->recovery_map[0]));
1592 
1593 	memset(dlm->recovery_map, 0, sizeof(dlm->recovery_map));
1594 	memset(dlm->live_nodes_map, 0, sizeof(dlm->live_nodes_map));
1595 	memset(dlm->domain_map, 0, sizeof(dlm->domain_map));
1596 
1597 	dlm->dlm_thread_task = NULL;
1598 	dlm->dlm_reco_thread_task = NULL;
1599 	dlm->dlm_worker = NULL;
1600 	init_waitqueue_head(&dlm->dlm_thread_wq);
1601 	init_waitqueue_head(&dlm->dlm_reco_thread_wq);
1602 	init_waitqueue_head(&dlm->reco.event);
1603 	init_waitqueue_head(&dlm->ast_wq);
1604 	init_waitqueue_head(&dlm->migration_wq);
1605 	INIT_LIST_HEAD(&dlm->mle_hb_events);
1606 
1607 	dlm->joining_node = DLM_LOCK_RES_OWNER_UNKNOWN;
1608 	init_waitqueue_head(&dlm->dlm_join_events);
1609 
1610 	dlm->reco.new_master = O2NM_INVALID_NODE_NUM;
1611 	dlm->reco.dead_node = O2NM_INVALID_NODE_NUM;
1612 
1613 	atomic_set(&dlm->res_tot_count, 0);
1614 	atomic_set(&dlm->res_cur_count, 0);
1615 	for (i = 0; i < DLM_MLE_NUM_TYPES; ++i) {
1616 		atomic_set(&dlm->mle_tot_count[i], 0);
1617 		atomic_set(&dlm->mle_cur_count[i], 0);
1618 	}
1619 
1620 	spin_lock_init(&dlm->work_lock);
1621 	INIT_LIST_HEAD(&dlm->work_list);
1622 	INIT_WORK(&dlm->dispatched_work, dlm_dispatch_work);
1623 
1624 	kref_init(&dlm->dlm_refs);
1625 	dlm->dlm_state = DLM_CTXT_NEW;
1626 
1627 	INIT_LIST_HEAD(&dlm->dlm_eviction_callbacks);
1628 
1629 	mlog(0, "context init: refcount %u\n",
1630 		  atomic_read(&dlm->dlm_refs.refcount));
1631 
1632 leave:
1633 	return dlm;
1634 }
1635 
1636 /*
1637  * Compare a requested locking protocol version against the current one.
1638  *
1639  * If the major numbers are different, they are incompatible.
1640  * If the current minor is greater than the request, they are incompatible.
1641  * If the current minor is less than or equal to the request, they are
1642  * compatible, and the requester should run at the current minor version.
1643  */
1644 static int dlm_protocol_compare(struct dlm_protocol_version *existing,
1645 				struct dlm_protocol_version *request)
1646 {
1647 	if (existing->pv_major != request->pv_major)
1648 		return 1;
1649 
1650 	if (existing->pv_minor > request->pv_minor)
1651 		return 1;
1652 
1653 	if (existing->pv_minor < request->pv_minor)
1654 		request->pv_minor = existing->pv_minor;
1655 
1656 	return 0;
1657 }
1658 
1659 /*
1660  * dlm_register_domain: one-time setup per "domain".
1661  *
1662  * The filesystem passes in the requested locking version via proto.
1663  * If registration was successful, proto will contain the negotiated
1664  * locking protocol.
1665  */
1666 struct dlm_ctxt * dlm_register_domain(const char *domain,
1667 			       u32 key,
1668 			       struct dlm_protocol_version *fs_proto)
1669 {
1670 	int ret;
1671 	struct dlm_ctxt *dlm = NULL;
1672 	struct dlm_ctxt *new_ctxt = NULL;
1673 
1674 	if (strlen(domain) >= O2NM_MAX_NAME_LEN) {
1675 		ret = -ENAMETOOLONG;
1676 		mlog(ML_ERROR, "domain name length too long\n");
1677 		goto leave;
1678 	}
1679 
1680 	if (!o2hb_check_local_node_heartbeating()) {
1681 		mlog(ML_ERROR, "the local node has not been configured, or is "
1682 		     "not heartbeating\n");
1683 		ret = -EPROTO;
1684 		goto leave;
1685 	}
1686 
1687 	mlog(0, "register called for domain \"%s\"\n", domain);
1688 
1689 retry:
1690 	dlm = NULL;
1691 	if (signal_pending(current)) {
1692 		ret = -ERESTARTSYS;
1693 		mlog_errno(ret);
1694 		goto leave;
1695 	}
1696 
1697 	spin_lock(&dlm_domain_lock);
1698 
1699 	dlm = __dlm_lookup_domain(domain);
1700 	if (dlm) {
1701 		if (dlm->dlm_state != DLM_CTXT_JOINED) {
1702 			spin_unlock(&dlm_domain_lock);
1703 
1704 			mlog(0, "This ctxt is not joined yet!\n");
1705 			wait_event_interruptible(dlm_domain_events,
1706 						 dlm_wait_on_domain_helper(
1707 							 domain));
1708 			goto retry;
1709 		}
1710 
1711 		if (dlm_protocol_compare(&dlm->fs_locking_proto, fs_proto)) {
1712 			spin_unlock(&dlm_domain_lock);
1713 			mlog(ML_ERROR,
1714 			     "Requested locking protocol version is not "
1715 			     "compatible with already registered domain "
1716 			     "\"%s\"\n", domain);
1717 			ret = -EPROTO;
1718 			goto leave;
1719 		}
1720 
1721 		__dlm_get(dlm);
1722 		dlm->num_joins++;
1723 
1724 		spin_unlock(&dlm_domain_lock);
1725 
1726 		ret = 0;
1727 		goto leave;
1728 	}
1729 
1730 	/* doesn't exist */
1731 	if (!new_ctxt) {
1732 		spin_unlock(&dlm_domain_lock);
1733 
1734 		new_ctxt = dlm_alloc_ctxt(domain, key);
1735 		if (new_ctxt)
1736 			goto retry;
1737 
1738 		ret = -ENOMEM;
1739 		mlog_errno(ret);
1740 		goto leave;
1741 	}
1742 
1743 	/* a little variable switch-a-roo here... */
1744 	dlm = new_ctxt;
1745 	new_ctxt = NULL;
1746 
1747 	/* add the new domain */
1748 	list_add_tail(&dlm->list, &dlm_domains);
1749 	spin_unlock(&dlm_domain_lock);
1750 
1751 	/*
1752 	 * Pass the locking protocol version into the join.  If the join
1753 	 * succeeds, it will have the negotiated protocol set.
1754 	 */
1755 	dlm->dlm_locking_proto = dlm_protocol;
1756 	dlm->fs_locking_proto = *fs_proto;
1757 
1758 	ret = dlm_join_domain(dlm);
1759 	if (ret) {
1760 		mlog_errno(ret);
1761 		dlm_put(dlm);
1762 		goto leave;
1763 	}
1764 
1765 	/* Tell the caller what locking protocol we negotiated */
1766 	*fs_proto = dlm->fs_locking_proto;
1767 
1768 	ret = 0;
1769 leave:
1770 	if (new_ctxt)
1771 		dlm_free_ctxt_mem(new_ctxt);
1772 
1773 	if (ret < 0)
1774 		dlm = ERR_PTR(ret);
1775 
1776 	return dlm;
1777 }
1778 EXPORT_SYMBOL_GPL(dlm_register_domain);
1779 
1780 static LIST_HEAD(dlm_join_handlers);
1781 
1782 static void dlm_unregister_net_handlers(void)
1783 {
1784 	o2net_unregister_handler_list(&dlm_join_handlers);
1785 }
1786 
1787 static int dlm_register_net_handlers(void)
1788 {
1789 	int status = 0;
1790 
1791 	status = o2net_register_handler(DLM_QUERY_JOIN_MSG, DLM_MOD_KEY,
1792 					sizeof(struct dlm_query_join_request),
1793 					dlm_query_join_handler,
1794 					NULL, NULL, &dlm_join_handlers);
1795 	if (status)
1796 		goto bail;
1797 
1798 	status = o2net_register_handler(DLM_ASSERT_JOINED_MSG, DLM_MOD_KEY,
1799 					sizeof(struct dlm_assert_joined),
1800 					dlm_assert_joined_handler,
1801 					NULL, NULL, &dlm_join_handlers);
1802 	if (status)
1803 		goto bail;
1804 
1805 	status = o2net_register_handler(DLM_CANCEL_JOIN_MSG, DLM_MOD_KEY,
1806 					sizeof(struct dlm_cancel_join),
1807 					dlm_cancel_join_handler,
1808 					NULL, NULL, &dlm_join_handlers);
1809 
1810 bail:
1811 	if (status < 0)
1812 		dlm_unregister_net_handlers();
1813 
1814 	return status;
1815 }
1816 
1817 /* Domain eviction callback handling.
1818  *
1819  * The file system requires notification of node death *before* the
1820  * dlm completes it's recovery work, otherwise it may be able to
1821  * acquire locks on resources requiring recovery. Since the dlm can
1822  * evict a node from it's domain *before* heartbeat fires, a similar
1823  * mechanism is required. */
1824 
1825 /* Eviction is not expected to happen often, so a per-domain lock is
1826  * not necessary. Eviction callbacks are allowed to sleep for short
1827  * periods of time. */
1828 static DECLARE_RWSEM(dlm_callback_sem);
1829 
1830 void dlm_fire_domain_eviction_callbacks(struct dlm_ctxt *dlm,
1831 					int node_num)
1832 {
1833 	struct list_head *iter;
1834 	struct dlm_eviction_cb *cb;
1835 
1836 	down_read(&dlm_callback_sem);
1837 	list_for_each(iter, &dlm->dlm_eviction_callbacks) {
1838 		cb = list_entry(iter, struct dlm_eviction_cb, ec_item);
1839 
1840 		cb->ec_func(node_num, cb->ec_data);
1841 	}
1842 	up_read(&dlm_callback_sem);
1843 }
1844 
1845 void dlm_setup_eviction_cb(struct dlm_eviction_cb *cb,
1846 			   dlm_eviction_func *f,
1847 			   void *data)
1848 {
1849 	INIT_LIST_HEAD(&cb->ec_item);
1850 	cb->ec_func = f;
1851 	cb->ec_data = data;
1852 }
1853 EXPORT_SYMBOL_GPL(dlm_setup_eviction_cb);
1854 
1855 void dlm_register_eviction_cb(struct dlm_ctxt *dlm,
1856 			      struct dlm_eviction_cb *cb)
1857 {
1858 	down_write(&dlm_callback_sem);
1859 	list_add_tail(&cb->ec_item, &dlm->dlm_eviction_callbacks);
1860 	up_write(&dlm_callback_sem);
1861 }
1862 EXPORT_SYMBOL_GPL(dlm_register_eviction_cb);
1863 
1864 void dlm_unregister_eviction_cb(struct dlm_eviction_cb *cb)
1865 {
1866 	down_write(&dlm_callback_sem);
1867 	list_del_init(&cb->ec_item);
1868 	up_write(&dlm_callback_sem);
1869 }
1870 EXPORT_SYMBOL_GPL(dlm_unregister_eviction_cb);
1871 
1872 static int __init dlm_init(void)
1873 {
1874 	int status;
1875 
1876 	dlm_print_version();
1877 
1878 	status = dlm_init_mle_cache();
1879 	if (status) {
1880 		mlog(ML_ERROR, "Could not create o2dlm_mle slabcache\n");
1881 		goto error;
1882 	}
1883 
1884 	status = dlm_init_master_caches();
1885 	if (status) {
1886 		mlog(ML_ERROR, "Could not create o2dlm_lockres and "
1887 		     "o2dlm_lockname slabcaches\n");
1888 		goto error;
1889 	}
1890 
1891 	status = dlm_init_lock_cache();
1892 	if (status) {
1893 		mlog(ML_ERROR, "Count not create o2dlm_lock slabcache\n");
1894 		goto error;
1895 	}
1896 
1897 	status = dlm_register_net_handlers();
1898 	if (status) {
1899 		mlog(ML_ERROR, "Unable to register network handlers\n");
1900 		goto error;
1901 	}
1902 
1903 	status = dlm_create_debugfs_root();
1904 	if (status)
1905 		goto error;
1906 
1907 	return 0;
1908 error:
1909 	dlm_unregister_net_handlers();
1910 	dlm_destroy_lock_cache();
1911 	dlm_destroy_master_caches();
1912 	dlm_destroy_mle_cache();
1913 	return -1;
1914 }
1915 
1916 static void __exit dlm_exit (void)
1917 {
1918 	dlm_destroy_debugfs_root();
1919 	dlm_unregister_net_handlers();
1920 	dlm_destroy_lock_cache();
1921 	dlm_destroy_master_caches();
1922 	dlm_destroy_mle_cache();
1923 }
1924 
1925 MODULE_AUTHOR("Oracle");
1926 MODULE_LICENSE("GPL");
1927 
1928 module_init(dlm_init);
1929 module_exit(dlm_exit);
1930