xref: /linux/fs/ocfs2/cluster/nodemanager.c (revision 26fbb4c8c7c3ee9a4c3b4de555a8587b5a19154e)
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
6  */
7 
8 #include <linux/slab.h>
9 #include <linux/kernel.h>
10 #include <linux/module.h>
11 #include <linux/configfs.h>
12 
13 #include "tcp.h"
14 #include "nodemanager.h"
15 #include "heartbeat.h"
16 #include "masklog.h"
17 #include "sys.h"
18 
19 /* for now we operate under the assertion that there can be only one
20  * cluster active at a time.  Changing this will require trickling
21  * cluster references throughout where nodes are looked up */
22 struct o2nm_cluster *o2nm_single_cluster = NULL;
23 
24 static const char *o2nm_fence_method_desc[O2NM_FENCE_METHODS] = {
25 	"reset",	/* O2NM_FENCE_RESET */
26 	"panic",	/* O2NM_FENCE_PANIC */
27 };
28 
29 static inline void o2nm_lock_subsystem(void);
30 static inline void o2nm_unlock_subsystem(void);
31 
32 struct o2nm_node *o2nm_get_node_by_num(u8 node_num)
33 {
34 	struct o2nm_node *node = NULL;
35 
36 	if (node_num >= O2NM_MAX_NODES || o2nm_single_cluster == NULL)
37 		goto out;
38 
39 	read_lock(&o2nm_single_cluster->cl_nodes_lock);
40 	node = o2nm_single_cluster->cl_nodes[node_num];
41 	if (node)
42 		config_item_get(&node->nd_item);
43 	read_unlock(&o2nm_single_cluster->cl_nodes_lock);
44 out:
45 	return node;
46 }
47 EXPORT_SYMBOL_GPL(o2nm_get_node_by_num);
48 
49 int o2nm_configured_node_map(unsigned long *map, unsigned bytes)
50 {
51 	struct o2nm_cluster *cluster = o2nm_single_cluster;
52 
53 	BUG_ON(bytes < (sizeof(cluster->cl_nodes_bitmap)));
54 
55 	if (cluster == NULL)
56 		return -EINVAL;
57 
58 	read_lock(&cluster->cl_nodes_lock);
59 	memcpy(map, cluster->cl_nodes_bitmap, sizeof(cluster->cl_nodes_bitmap));
60 	read_unlock(&cluster->cl_nodes_lock);
61 
62 	return 0;
63 }
64 EXPORT_SYMBOL_GPL(o2nm_configured_node_map);
65 
66 static struct o2nm_node *o2nm_node_ip_tree_lookup(struct o2nm_cluster *cluster,
67 						  __be32 ip_needle,
68 						  struct rb_node ***ret_p,
69 						  struct rb_node **ret_parent)
70 {
71 	struct rb_node **p = &cluster->cl_node_ip_tree.rb_node;
72 	struct rb_node *parent = NULL;
73 	struct o2nm_node *node, *ret = NULL;
74 
75 	while (*p) {
76 		int cmp;
77 
78 		parent = *p;
79 		node = rb_entry(parent, struct o2nm_node, nd_ip_node);
80 
81 		cmp = memcmp(&ip_needle, &node->nd_ipv4_address,
82 				sizeof(ip_needle));
83 		if (cmp < 0)
84 			p = &(*p)->rb_left;
85 		else if (cmp > 0)
86 			p = &(*p)->rb_right;
87 		else {
88 			ret = node;
89 			break;
90 		}
91 	}
92 
93 	if (ret_p != NULL)
94 		*ret_p = p;
95 	if (ret_parent != NULL)
96 		*ret_parent = parent;
97 
98 	return ret;
99 }
100 
101 struct o2nm_node *o2nm_get_node_by_ip(__be32 addr)
102 {
103 	struct o2nm_node *node = NULL;
104 	struct o2nm_cluster *cluster = o2nm_single_cluster;
105 
106 	if (cluster == NULL)
107 		goto out;
108 
109 	read_lock(&cluster->cl_nodes_lock);
110 	node = o2nm_node_ip_tree_lookup(cluster, addr, NULL, NULL);
111 	if (node)
112 		config_item_get(&node->nd_item);
113 	read_unlock(&cluster->cl_nodes_lock);
114 
115 out:
116 	return node;
117 }
118 EXPORT_SYMBOL_GPL(o2nm_get_node_by_ip);
119 
120 void o2nm_node_put(struct o2nm_node *node)
121 {
122 	config_item_put(&node->nd_item);
123 }
124 EXPORT_SYMBOL_GPL(o2nm_node_put);
125 
126 void o2nm_node_get(struct o2nm_node *node)
127 {
128 	config_item_get(&node->nd_item);
129 }
130 EXPORT_SYMBOL_GPL(o2nm_node_get);
131 
132 u8 o2nm_this_node(void)
133 {
134 	u8 node_num = O2NM_MAX_NODES;
135 
136 	if (o2nm_single_cluster && o2nm_single_cluster->cl_has_local)
137 		node_num = o2nm_single_cluster->cl_local_node;
138 
139 	return node_num;
140 }
141 EXPORT_SYMBOL_GPL(o2nm_this_node);
142 
143 /* node configfs bits */
144 
145 static struct o2nm_cluster *to_o2nm_cluster(struct config_item *item)
146 {
147 	return item ?
148 		container_of(to_config_group(item), struct o2nm_cluster,
149 			     cl_group)
150 		: NULL;
151 }
152 
153 static struct o2nm_node *to_o2nm_node(struct config_item *item)
154 {
155 	return item ? container_of(item, struct o2nm_node, nd_item) : NULL;
156 }
157 
158 static void o2nm_node_release(struct config_item *item)
159 {
160 	struct o2nm_node *node = to_o2nm_node(item);
161 	kfree(node);
162 }
163 
164 static ssize_t o2nm_node_num_show(struct config_item *item, char *page)
165 {
166 	return sprintf(page, "%d\n", to_o2nm_node(item)->nd_num);
167 }
168 
169 static struct o2nm_cluster *to_o2nm_cluster_from_node(struct o2nm_node *node)
170 {
171 	/* through the first node_set .parent
172 	 * mycluster/nodes/mynode == o2nm_cluster->o2nm_node_group->o2nm_node */
173 	if (node->nd_item.ci_parent)
174 		return to_o2nm_cluster(node->nd_item.ci_parent->ci_parent);
175 	else
176 		return NULL;
177 }
178 
179 enum {
180 	O2NM_NODE_ATTR_NUM = 0,
181 	O2NM_NODE_ATTR_PORT,
182 	O2NM_NODE_ATTR_ADDRESS,
183 };
184 
185 static ssize_t o2nm_node_num_store(struct config_item *item, const char *page,
186 				   size_t count)
187 {
188 	struct o2nm_node *node = to_o2nm_node(item);
189 	struct o2nm_cluster *cluster;
190 	unsigned long tmp;
191 	char *p = (char *)page;
192 	int ret = 0;
193 
194 	tmp = simple_strtoul(p, &p, 0);
195 	if (!p || (*p && (*p != '\n')))
196 		return -EINVAL;
197 
198 	if (tmp >= O2NM_MAX_NODES)
199 		return -ERANGE;
200 
201 	/* once we're in the cl_nodes tree networking can look us up by
202 	 * node number and try to use our address and port attributes
203 	 * to connect to this node.. make sure that they've been set
204 	 * before writing the node attribute? */
205 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
206 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
207 		return -EINVAL; /* XXX */
208 
209 	o2nm_lock_subsystem();
210 	cluster = to_o2nm_cluster_from_node(node);
211 	if (!cluster) {
212 		o2nm_unlock_subsystem();
213 		return -EINVAL;
214 	}
215 
216 	write_lock(&cluster->cl_nodes_lock);
217 	if (cluster->cl_nodes[tmp])
218 		ret = -EEXIST;
219 	else if (test_and_set_bit(O2NM_NODE_ATTR_NUM,
220 			&node->nd_set_attributes))
221 		ret = -EBUSY;
222 	else  {
223 		cluster->cl_nodes[tmp] = node;
224 		node->nd_num = tmp;
225 		set_bit(tmp, cluster->cl_nodes_bitmap);
226 	}
227 	write_unlock(&cluster->cl_nodes_lock);
228 	o2nm_unlock_subsystem();
229 
230 	if (ret)
231 		return ret;
232 
233 	return count;
234 }
235 static ssize_t o2nm_node_ipv4_port_show(struct config_item *item, char *page)
236 {
237 	return sprintf(page, "%u\n", ntohs(to_o2nm_node(item)->nd_ipv4_port));
238 }
239 
240 static ssize_t o2nm_node_ipv4_port_store(struct config_item *item,
241 					 const char *page, size_t count)
242 {
243 	struct o2nm_node *node = to_o2nm_node(item);
244 	unsigned long tmp;
245 	char *p = (char *)page;
246 
247 	tmp = simple_strtoul(p, &p, 0);
248 	if (!p || (*p && (*p != '\n')))
249 		return -EINVAL;
250 
251 	if (tmp == 0)
252 		return -EINVAL;
253 	if (tmp >= (u16)-1)
254 		return -ERANGE;
255 
256 	if (test_and_set_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
257 		return -EBUSY;
258 	node->nd_ipv4_port = htons(tmp);
259 
260 	return count;
261 }
262 
263 static ssize_t o2nm_node_ipv4_address_show(struct config_item *item, char *page)
264 {
265 	return sprintf(page, "%pI4\n", &to_o2nm_node(item)->nd_ipv4_address);
266 }
267 
268 static ssize_t o2nm_node_ipv4_address_store(struct config_item *item,
269 					    const char *page,
270 					    size_t count)
271 {
272 	struct o2nm_node *node = to_o2nm_node(item);
273 	struct o2nm_cluster *cluster;
274 	int ret, i;
275 	struct rb_node **p, *parent;
276 	unsigned int octets[4];
277 	__be32 ipv4_addr = 0;
278 
279 	ret = sscanf(page, "%3u.%3u.%3u.%3u", &octets[3], &octets[2],
280 		     &octets[1], &octets[0]);
281 	if (ret != 4)
282 		return -EINVAL;
283 
284 	for (i = 0; i < ARRAY_SIZE(octets); i++) {
285 		if (octets[i] > 255)
286 			return -ERANGE;
287 		be32_add_cpu(&ipv4_addr, octets[i] << (i * 8));
288 	}
289 
290 	o2nm_lock_subsystem();
291 	cluster = to_o2nm_cluster_from_node(node);
292 	if (!cluster) {
293 		o2nm_unlock_subsystem();
294 		return -EINVAL;
295 	}
296 
297 	ret = 0;
298 	write_lock(&cluster->cl_nodes_lock);
299 	if (o2nm_node_ip_tree_lookup(cluster, ipv4_addr, &p, &parent))
300 		ret = -EEXIST;
301 	else if (test_and_set_bit(O2NM_NODE_ATTR_ADDRESS,
302 			&node->nd_set_attributes))
303 		ret = -EBUSY;
304 	else {
305 		rb_link_node(&node->nd_ip_node, parent, p);
306 		rb_insert_color(&node->nd_ip_node, &cluster->cl_node_ip_tree);
307 	}
308 	write_unlock(&cluster->cl_nodes_lock);
309 	o2nm_unlock_subsystem();
310 
311 	if (ret)
312 		return ret;
313 
314 	memcpy(&node->nd_ipv4_address, &ipv4_addr, sizeof(ipv4_addr));
315 
316 	return count;
317 }
318 
319 static ssize_t o2nm_node_local_show(struct config_item *item, char *page)
320 {
321 	return sprintf(page, "%d\n", to_o2nm_node(item)->nd_local);
322 }
323 
324 static ssize_t o2nm_node_local_store(struct config_item *item, const char *page,
325 				     size_t count)
326 {
327 	struct o2nm_node *node = to_o2nm_node(item);
328 	struct o2nm_cluster *cluster;
329 	unsigned long tmp;
330 	char *p = (char *)page;
331 	ssize_t ret;
332 
333 	tmp = simple_strtoul(p, &p, 0);
334 	if (!p || (*p && (*p != '\n')))
335 		return -EINVAL;
336 
337 	tmp = !!tmp; /* boolean of whether this node wants to be local */
338 
339 	/* setting local turns on networking rx for now so we require having
340 	 * set everything else first */
341 	if (!test_bit(O2NM_NODE_ATTR_ADDRESS, &node->nd_set_attributes) ||
342 	    !test_bit(O2NM_NODE_ATTR_NUM, &node->nd_set_attributes) ||
343 	    !test_bit(O2NM_NODE_ATTR_PORT, &node->nd_set_attributes))
344 		return -EINVAL; /* XXX */
345 
346 	o2nm_lock_subsystem();
347 	cluster = to_o2nm_cluster_from_node(node);
348 	if (!cluster) {
349 		ret = -EINVAL;
350 		goto out;
351 	}
352 
353 	/* the only failure case is trying to set a new local node
354 	 * when a different one is already set */
355 	if (tmp && tmp == cluster->cl_has_local &&
356 	    cluster->cl_local_node != node->nd_num) {
357 		ret = -EBUSY;
358 		goto out;
359 	}
360 
361 	/* bring up the rx thread if we're setting the new local node. */
362 	if (tmp && !cluster->cl_has_local) {
363 		ret = o2net_start_listening(node);
364 		if (ret)
365 			goto out;
366 	}
367 
368 	if (!tmp && cluster->cl_has_local &&
369 	    cluster->cl_local_node == node->nd_num) {
370 		o2net_stop_listening(node);
371 		cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
372 	}
373 
374 	node->nd_local = tmp;
375 	if (node->nd_local) {
376 		cluster->cl_has_local = tmp;
377 		cluster->cl_local_node = node->nd_num;
378 	}
379 
380 	ret = count;
381 
382 out:
383 	o2nm_unlock_subsystem();
384 	return ret;
385 }
386 
387 CONFIGFS_ATTR(o2nm_node_, num);
388 CONFIGFS_ATTR(o2nm_node_, ipv4_port);
389 CONFIGFS_ATTR(o2nm_node_, ipv4_address);
390 CONFIGFS_ATTR(o2nm_node_, local);
391 
392 static struct configfs_attribute *o2nm_node_attrs[] = {
393 	&o2nm_node_attr_num,
394 	&o2nm_node_attr_ipv4_port,
395 	&o2nm_node_attr_ipv4_address,
396 	&o2nm_node_attr_local,
397 	NULL,
398 };
399 
400 static struct configfs_item_operations o2nm_node_item_ops = {
401 	.release		= o2nm_node_release,
402 };
403 
404 static const struct config_item_type o2nm_node_type = {
405 	.ct_item_ops	= &o2nm_node_item_ops,
406 	.ct_attrs	= o2nm_node_attrs,
407 	.ct_owner	= THIS_MODULE,
408 };
409 
410 /* node set */
411 
412 struct o2nm_node_group {
413 	struct config_group ns_group;
414 	/* some stuff? */
415 };
416 
417 #if 0
418 static struct o2nm_node_group *to_o2nm_node_group(struct config_group *group)
419 {
420 	return group ?
421 		container_of(group, struct o2nm_node_group, ns_group)
422 		: NULL;
423 }
424 #endif
425 
426 static ssize_t o2nm_cluster_attr_write(const char *page, ssize_t count,
427                                        unsigned int *val)
428 {
429 	unsigned long tmp;
430 	char *p = (char *)page;
431 
432 	tmp = simple_strtoul(p, &p, 0);
433 	if (!p || (*p && (*p != '\n')))
434 		return -EINVAL;
435 
436 	if (tmp == 0)
437 		return -EINVAL;
438 	if (tmp >= (u32)-1)
439 		return -ERANGE;
440 
441 	*val = tmp;
442 
443 	return count;
444 }
445 
446 static ssize_t o2nm_cluster_idle_timeout_ms_show(struct config_item *item,
447 	char *page)
448 {
449 	return sprintf(page, "%u\n", to_o2nm_cluster(item)->cl_idle_timeout_ms);
450 }
451 
452 static ssize_t o2nm_cluster_idle_timeout_ms_store(struct config_item *item,
453 	const char *page, size_t count)
454 {
455 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
456 	ssize_t ret;
457 	unsigned int val;
458 
459 	ret =  o2nm_cluster_attr_write(page, count, &val);
460 
461 	if (ret > 0) {
462 		if (cluster->cl_idle_timeout_ms != val
463 			&& o2net_num_connected_peers()) {
464 			mlog(ML_NOTICE,
465 			     "o2net: cannot change idle timeout after "
466 			     "the first peer has agreed to it."
467 			     "  %d connected peers\n",
468 			     o2net_num_connected_peers());
469 			ret = -EINVAL;
470 		} else if (val <= cluster->cl_keepalive_delay_ms) {
471 			mlog(ML_NOTICE, "o2net: idle timeout must be larger "
472 			     "than keepalive delay\n");
473 			ret = -EINVAL;
474 		} else {
475 			cluster->cl_idle_timeout_ms = val;
476 		}
477 	}
478 
479 	return ret;
480 }
481 
482 static ssize_t o2nm_cluster_keepalive_delay_ms_show(
483 	struct config_item *item, char *page)
484 {
485 	return sprintf(page, "%u\n",
486 			to_o2nm_cluster(item)->cl_keepalive_delay_ms);
487 }
488 
489 static ssize_t o2nm_cluster_keepalive_delay_ms_store(
490 	struct config_item *item, const char *page, size_t count)
491 {
492 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
493 	ssize_t ret;
494 	unsigned int val;
495 
496 	ret =  o2nm_cluster_attr_write(page, count, &val);
497 
498 	if (ret > 0) {
499 		if (cluster->cl_keepalive_delay_ms != val
500 		    && o2net_num_connected_peers()) {
501 			mlog(ML_NOTICE,
502 			     "o2net: cannot change keepalive delay after"
503 			     " the first peer has agreed to it."
504 			     "  %d connected peers\n",
505 			     o2net_num_connected_peers());
506 			ret = -EINVAL;
507 		} else if (val >= cluster->cl_idle_timeout_ms) {
508 			mlog(ML_NOTICE, "o2net: keepalive delay must be "
509 			     "smaller than idle timeout\n");
510 			ret = -EINVAL;
511 		} else {
512 			cluster->cl_keepalive_delay_ms = val;
513 		}
514 	}
515 
516 	return ret;
517 }
518 
519 static ssize_t o2nm_cluster_reconnect_delay_ms_show(
520 	struct config_item *item, char *page)
521 {
522 	return sprintf(page, "%u\n",
523 			to_o2nm_cluster(item)->cl_reconnect_delay_ms);
524 }
525 
526 static ssize_t o2nm_cluster_reconnect_delay_ms_store(
527 	struct config_item *item, const char *page, size_t count)
528 {
529 	return o2nm_cluster_attr_write(page, count,
530                                &to_o2nm_cluster(item)->cl_reconnect_delay_ms);
531 }
532 
533 static ssize_t o2nm_cluster_fence_method_show(
534 	struct config_item *item, char *page)
535 {
536 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
537 	ssize_t ret = 0;
538 
539 	if (cluster)
540 		ret = sprintf(page, "%s\n",
541 			      o2nm_fence_method_desc[cluster->cl_fence_method]);
542 	return ret;
543 }
544 
545 static ssize_t o2nm_cluster_fence_method_store(
546 	struct config_item *item, const char *page, size_t count)
547 {
548 	unsigned int i;
549 
550 	if (page[count - 1] != '\n')
551 		goto bail;
552 
553 	for (i = 0; i < O2NM_FENCE_METHODS; ++i) {
554 		if (count != strlen(o2nm_fence_method_desc[i]) + 1)
555 			continue;
556 		if (strncasecmp(page, o2nm_fence_method_desc[i], count - 1))
557 			continue;
558 		if (to_o2nm_cluster(item)->cl_fence_method != i) {
559 			printk(KERN_INFO "ocfs2: Changing fence method to %s\n",
560 			       o2nm_fence_method_desc[i]);
561 			to_o2nm_cluster(item)->cl_fence_method = i;
562 		}
563 		return count;
564 	}
565 
566 bail:
567 	return -EINVAL;
568 }
569 
570 CONFIGFS_ATTR(o2nm_cluster_, idle_timeout_ms);
571 CONFIGFS_ATTR(o2nm_cluster_, keepalive_delay_ms);
572 CONFIGFS_ATTR(o2nm_cluster_, reconnect_delay_ms);
573 CONFIGFS_ATTR(o2nm_cluster_, fence_method);
574 
575 static struct configfs_attribute *o2nm_cluster_attrs[] = {
576 	&o2nm_cluster_attr_idle_timeout_ms,
577 	&o2nm_cluster_attr_keepalive_delay_ms,
578 	&o2nm_cluster_attr_reconnect_delay_ms,
579 	&o2nm_cluster_attr_fence_method,
580 	NULL,
581 };
582 
583 static struct config_item *o2nm_node_group_make_item(struct config_group *group,
584 						     const char *name)
585 {
586 	struct o2nm_node *node = NULL;
587 
588 	if (strlen(name) > O2NM_MAX_NAME_LEN)
589 		return ERR_PTR(-ENAMETOOLONG);
590 
591 	node = kzalloc(sizeof(struct o2nm_node), GFP_KERNEL);
592 	if (node == NULL)
593 		return ERR_PTR(-ENOMEM);
594 
595 	strcpy(node->nd_name, name); /* use item.ci_namebuf instead? */
596 	config_item_init_type_name(&node->nd_item, name, &o2nm_node_type);
597 	spin_lock_init(&node->nd_lock);
598 
599 	mlog(ML_CLUSTER, "o2nm: Registering node %s\n", name);
600 
601 	return &node->nd_item;
602 }
603 
604 static void o2nm_node_group_drop_item(struct config_group *group,
605 				      struct config_item *item)
606 {
607 	struct o2nm_node *node = to_o2nm_node(item);
608 	struct o2nm_cluster *cluster = to_o2nm_cluster(group->cg_item.ci_parent);
609 
610 	if (cluster->cl_nodes[node->nd_num] == node) {
611 		o2net_disconnect_node(node);
612 
613 		if (cluster->cl_has_local &&
614 		    (cluster->cl_local_node == node->nd_num)) {
615 			cluster->cl_has_local = 0;
616 			cluster->cl_local_node = O2NM_INVALID_NODE_NUM;
617 			o2net_stop_listening(node);
618 		}
619 	}
620 
621 	/* XXX call into net to stop this node from trading messages */
622 
623 	write_lock(&cluster->cl_nodes_lock);
624 
625 	/* XXX sloppy */
626 	if (node->nd_ipv4_address)
627 		rb_erase(&node->nd_ip_node, &cluster->cl_node_ip_tree);
628 
629 	/* nd_num might be 0 if the node number hasn't been set.. */
630 	if (cluster->cl_nodes[node->nd_num] == node) {
631 		cluster->cl_nodes[node->nd_num] = NULL;
632 		clear_bit(node->nd_num, cluster->cl_nodes_bitmap);
633 	}
634 	write_unlock(&cluster->cl_nodes_lock);
635 
636 	mlog(ML_CLUSTER, "o2nm: Unregistered node %s\n",
637 	     config_item_name(&node->nd_item));
638 
639 	config_item_put(item);
640 }
641 
642 static struct configfs_group_operations o2nm_node_group_group_ops = {
643 	.make_item	= o2nm_node_group_make_item,
644 	.drop_item	= o2nm_node_group_drop_item,
645 };
646 
647 static const struct config_item_type o2nm_node_group_type = {
648 	.ct_group_ops	= &o2nm_node_group_group_ops,
649 	.ct_owner	= THIS_MODULE,
650 };
651 
652 /* cluster */
653 
654 static void o2nm_cluster_release(struct config_item *item)
655 {
656 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
657 
658 	kfree(cluster);
659 }
660 
661 static struct configfs_item_operations o2nm_cluster_item_ops = {
662 	.release	= o2nm_cluster_release,
663 };
664 
665 static const struct config_item_type o2nm_cluster_type = {
666 	.ct_item_ops	= &o2nm_cluster_item_ops,
667 	.ct_attrs	= o2nm_cluster_attrs,
668 	.ct_owner	= THIS_MODULE,
669 };
670 
671 /* cluster set */
672 
673 struct o2nm_cluster_group {
674 	struct configfs_subsystem cs_subsys;
675 	/* some stuff? */
676 };
677 
678 #if 0
679 static struct o2nm_cluster_group *to_o2nm_cluster_group(struct config_group *group)
680 {
681 	return group ?
682 		container_of(to_configfs_subsystem(group), struct o2nm_cluster_group, cs_subsys)
683 	       : NULL;
684 }
685 #endif
686 
687 static struct config_group *o2nm_cluster_group_make_group(struct config_group *group,
688 							  const char *name)
689 {
690 	struct o2nm_cluster *cluster = NULL;
691 	struct o2nm_node_group *ns = NULL;
692 	struct config_group *o2hb_group = NULL, *ret = NULL;
693 
694 	/* this runs under the parent dir's i_mutex; there can be only
695 	 * one caller in here at a time */
696 	if (o2nm_single_cluster)
697 		return ERR_PTR(-ENOSPC);
698 
699 	cluster = kzalloc(sizeof(struct o2nm_cluster), GFP_KERNEL);
700 	ns = kzalloc(sizeof(struct o2nm_node_group), GFP_KERNEL);
701 	o2hb_group = o2hb_alloc_hb_set();
702 	if (cluster == NULL || ns == NULL || o2hb_group == NULL)
703 		goto out;
704 
705 	config_group_init_type_name(&cluster->cl_group, name,
706 				    &o2nm_cluster_type);
707 	configfs_add_default_group(&ns->ns_group, &cluster->cl_group);
708 
709 	config_group_init_type_name(&ns->ns_group, "node",
710 				    &o2nm_node_group_type);
711 	configfs_add_default_group(o2hb_group, &cluster->cl_group);
712 
713 	rwlock_init(&cluster->cl_nodes_lock);
714 	cluster->cl_node_ip_tree = RB_ROOT;
715 	cluster->cl_reconnect_delay_ms = O2NET_RECONNECT_DELAY_MS_DEFAULT;
716 	cluster->cl_idle_timeout_ms    = O2NET_IDLE_TIMEOUT_MS_DEFAULT;
717 	cluster->cl_keepalive_delay_ms = O2NET_KEEPALIVE_DELAY_MS_DEFAULT;
718 	cluster->cl_fence_method       = O2NM_FENCE_RESET;
719 
720 	ret = &cluster->cl_group;
721 	o2nm_single_cluster = cluster;
722 
723 out:
724 	if (ret == NULL) {
725 		kfree(cluster);
726 		kfree(ns);
727 		o2hb_free_hb_set(o2hb_group);
728 		ret = ERR_PTR(-ENOMEM);
729 	}
730 
731 	return ret;
732 }
733 
734 static void o2nm_cluster_group_drop_item(struct config_group *group, struct config_item *item)
735 {
736 	struct o2nm_cluster *cluster = to_o2nm_cluster(item);
737 
738 	BUG_ON(o2nm_single_cluster != cluster);
739 	o2nm_single_cluster = NULL;
740 
741 	configfs_remove_default_groups(&cluster->cl_group);
742 	config_item_put(item);
743 }
744 
745 static struct configfs_group_operations o2nm_cluster_group_group_ops = {
746 	.make_group	= o2nm_cluster_group_make_group,
747 	.drop_item	= o2nm_cluster_group_drop_item,
748 };
749 
750 static const struct config_item_type o2nm_cluster_group_type = {
751 	.ct_group_ops	= &o2nm_cluster_group_group_ops,
752 	.ct_owner	= THIS_MODULE,
753 };
754 
755 static struct o2nm_cluster_group o2nm_cluster_group = {
756 	.cs_subsys = {
757 		.su_group = {
758 			.cg_item = {
759 				.ci_namebuf = "cluster",
760 				.ci_type = &o2nm_cluster_group_type,
761 			},
762 		},
763 	},
764 };
765 
766 static inline void o2nm_lock_subsystem(void)
767 {
768 	mutex_lock(&o2nm_cluster_group.cs_subsys.su_mutex);
769 }
770 
771 static inline void o2nm_unlock_subsystem(void)
772 {
773 	mutex_unlock(&o2nm_cluster_group.cs_subsys.su_mutex);
774 }
775 
776 int o2nm_depend_item(struct config_item *item)
777 {
778 	return configfs_depend_item(&o2nm_cluster_group.cs_subsys, item);
779 }
780 
781 void o2nm_undepend_item(struct config_item *item)
782 {
783 	configfs_undepend_item(item);
784 }
785 
786 int o2nm_depend_this_node(void)
787 {
788 	int ret = 0;
789 	struct o2nm_node *local_node;
790 
791 	local_node = o2nm_get_node_by_num(o2nm_this_node());
792 	if (!local_node) {
793 		ret = -EINVAL;
794 		goto out;
795 	}
796 
797 	ret = o2nm_depend_item(&local_node->nd_item);
798 	o2nm_node_put(local_node);
799 
800 out:
801 	return ret;
802 }
803 
804 void o2nm_undepend_this_node(void)
805 {
806 	struct o2nm_node *local_node;
807 
808 	local_node = o2nm_get_node_by_num(o2nm_this_node());
809 	BUG_ON(!local_node);
810 
811 	o2nm_undepend_item(&local_node->nd_item);
812 	o2nm_node_put(local_node);
813 }
814 
815 
816 static void __exit exit_o2nm(void)
817 {
818 	/* XXX sync with hb callbacks and shut down hb? */
819 	o2net_unregister_hb_callbacks();
820 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
821 	o2cb_sys_shutdown();
822 
823 	o2net_exit();
824 	o2hb_exit();
825 }
826 
827 static int __init init_o2nm(void)
828 {
829 	int ret = -1;
830 
831 	o2hb_init();
832 
833 	ret = o2net_init();
834 	if (ret)
835 		goto out_o2hb;
836 
837 	ret = o2net_register_hb_callbacks();
838 	if (ret)
839 		goto out_o2net;
840 
841 	config_group_init(&o2nm_cluster_group.cs_subsys.su_group);
842 	mutex_init(&o2nm_cluster_group.cs_subsys.su_mutex);
843 	ret = configfs_register_subsystem(&o2nm_cluster_group.cs_subsys);
844 	if (ret) {
845 		printk(KERN_ERR "nodemanager: Registration returned %d\n", ret);
846 		goto out_callbacks;
847 	}
848 
849 	ret = o2cb_sys_init();
850 	if (!ret)
851 		goto out;
852 
853 	configfs_unregister_subsystem(&o2nm_cluster_group.cs_subsys);
854 out_callbacks:
855 	o2net_unregister_hb_callbacks();
856 out_o2net:
857 	o2net_exit();
858 out_o2hb:
859 	o2hb_exit();
860 out:
861 	return ret;
862 }
863 
864 MODULE_AUTHOR("Oracle");
865 MODULE_LICENSE("GPL");
866 MODULE_DESCRIPTION("OCFS2 cluster management");
867 
868 module_init(init_o2nm)
869 module_exit(exit_o2nm)
870