xref: /linux/fs/ocfs2/stack_user.c (revision 4d5e3b06e1fc1428be14cd4ebe3b37c1bb34f95d)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * stack_user.c
4  *
5  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
6  *
7  * Copyright (C) 2007 Oracle.  All rights reserved.
8  */
9 
10 #include <linux/module.h>
11 #include <linux/fs.h>
12 #include <linux/miscdevice.h>
13 #include <linux/mutex.h>
14 #include <linux/slab.h>
15 #include <linux/reboot.h>
16 #include <linux/sched.h>
17 #include <linux/uaccess.h>
18 
19 #include "stackglue.h"
20 
21 #include <linux/dlm_plock.h>
22 
23 /*
24  * The control protocol starts with a handshake.  Until the handshake
25  * is complete, the control device will fail all write(2)s.
26  *
27  * The handshake is simple.  First, the client reads until EOF.  Each line
28  * of output is a supported protocol tag.  All protocol tags are a single
29  * character followed by a two hex digit version number.  Currently the
30  * only things supported is T01, for "Text-base version 0x01".  Next, the
31  * client writes the version they would like to use, including the newline.
32  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
33  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
34  * client can start sending messages.
35  *
36  * The T01 protocol has three messages.  First is the "SETN" message.
37  * It has the following syntax:
38  *
39  *  SETN<space><8-char-hex-nodenum><newline>
40  *
41  * This is 14 characters.
42  *
43  * The "SETN" message must be the first message following the protocol.
44  * It tells ocfs2_control the local node number.
45  *
46  * Next comes the "SETV" message.  It has the following syntax:
47  *
48  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
49  *
50  * This is 11 characters.
51  *
52  * The "SETV" message sets the filesystem locking protocol version as
53  * negotiated by the client.  The client negotiates based on the maximum
54  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
55  * number from the "SETV" message must match
56  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
57  * must be less than or equal to ...sp_max_version.pv_minor.
58  *
59  * Once this information has been set, mounts will be allowed.  From this
60  * point on, the "DOWN" message can be sent for node down notification.
61  * It has the following syntax:
62  *
63  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
64  *
65  * eg:
66  *
67  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
68  *
69  * This is 47 characters.
70  */
71 
72 /*
73  * Whether or not the client has done the handshake.
74  * For now, we have just one protocol version.
75  */
76 #define OCFS2_CONTROL_PROTO			"T01\n"
77 #define OCFS2_CONTROL_PROTO_LEN			4
78 
79 /* Handshake states */
80 #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
81 #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
82 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
83 #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
84 
85 /* Messages */
86 #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
87 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
88 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
89 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
91 #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
92 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
93 #define OCFS2_TEXT_UUID_LEN			32
94 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
95 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
96 #define VERSION_LOCK				"version_lock"
97 
98 enum ocfs2_connection_type {
99 	WITH_CONTROLD,
100 	NO_CONTROLD
101 };
102 
103 /*
104  * ocfs2_live_connection is refcounted because the filesystem and
105  * miscdevice sides can detach in different order.  Let's just be safe.
106  */
107 struct ocfs2_live_connection {
108 	struct list_head		oc_list;
109 	struct ocfs2_cluster_connection	*oc_conn;
110 	enum ocfs2_connection_type	oc_type;
111 	atomic_t                        oc_this_node;
112 	int                             oc_our_slot;
113 	struct dlm_lksb                 oc_version_lksb;
114 	char                            oc_lvb[DLM_LVB_LEN];
115 	struct completion               oc_sync_wait;
116 	wait_queue_head_t		oc_wait;
117 };
118 
119 struct ocfs2_control_private {
120 	struct list_head op_list;
121 	int op_state;
122 	int op_this_node;
123 	struct ocfs2_protocol_version op_proto;
124 };
125 
126 /* SETN<space><8-char-hex-nodenum><newline> */
127 struct ocfs2_control_message_setn {
128 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
129 	char	space;
130 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
131 	char	newline;
132 };
133 
134 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
135 struct ocfs2_control_message_setv {
136 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
137 	char	space1;
138 	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
139 	char	space2;
140 	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
141 	char	newline;
142 };
143 
144 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
145 struct ocfs2_control_message_down {
146 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
147 	char	space1;
148 	char	uuid[OCFS2_TEXT_UUID_LEN];
149 	char	space2;
150 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
151 	char	newline;
152 };
153 
154 union ocfs2_control_message {
155 	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
156 	struct ocfs2_control_message_setn	u_setn;
157 	struct ocfs2_control_message_setv	u_setv;
158 	struct ocfs2_control_message_down	u_down;
159 };
160 
161 static struct ocfs2_stack_plugin ocfs2_user_plugin;
162 
163 static atomic_t ocfs2_control_opened;
164 static int ocfs2_control_this_node = -1;
165 static struct ocfs2_protocol_version running_proto;
166 
167 static LIST_HEAD(ocfs2_live_connection_list);
168 static LIST_HEAD(ocfs2_control_private_list);
169 static DEFINE_MUTEX(ocfs2_control_lock);
170 
171 static inline void ocfs2_control_set_handshake_state(struct file *file,
172 						     int state)
173 {
174 	struct ocfs2_control_private *p = file->private_data;
175 	p->op_state = state;
176 }
177 
178 static inline int ocfs2_control_get_handshake_state(struct file *file)
179 {
180 	struct ocfs2_control_private *p = file->private_data;
181 	return p->op_state;
182 }
183 
184 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
185 {
186 	size_t len = strlen(name);
187 	struct ocfs2_live_connection *c;
188 
189 	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
190 
191 	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
192 		if ((c->oc_conn->cc_namelen == len) &&
193 		    !strncmp(c->oc_conn->cc_name, name, len))
194 			return c;
195 	}
196 
197 	return NULL;
198 }
199 
200 /*
201  * ocfs2_live_connection structures are created underneath the ocfs2
202  * mount path.  Since the VFS prevents multiple calls to
203  * fill_super(), we can't get dupes here.
204  */
205 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
206 				     struct ocfs2_live_connection *c)
207 {
208 	int rc = 0;
209 
210 	mutex_lock(&ocfs2_control_lock);
211 	c->oc_conn = conn;
212 
213 	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
214 		list_add(&c->oc_list, &ocfs2_live_connection_list);
215 	else {
216 		printk(KERN_ERR
217 		       "ocfs2: Userspace control daemon is not present\n");
218 		rc = -ESRCH;
219 	}
220 
221 	mutex_unlock(&ocfs2_control_lock);
222 	return rc;
223 }
224 
225 /*
226  * This function disconnects the cluster connection from ocfs2_control.
227  * Afterwards, userspace can't affect the cluster connection.
228  */
229 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
230 {
231 	mutex_lock(&ocfs2_control_lock);
232 	list_del_init(&c->oc_list);
233 	c->oc_conn = NULL;
234 	mutex_unlock(&ocfs2_control_lock);
235 
236 	kfree(c);
237 }
238 
239 static int ocfs2_control_cfu(void *target, size_t target_len,
240 			     const char __user *buf, size_t count)
241 {
242 	/* The T01 expects write(2) calls to have exactly one command */
243 	if ((count != target_len) ||
244 	    (count > sizeof(union ocfs2_control_message)))
245 		return -EINVAL;
246 
247 	if (copy_from_user(target, buf, target_len))
248 		return -EFAULT;
249 
250 	return 0;
251 }
252 
253 static ssize_t ocfs2_control_validate_protocol(struct file *file,
254 					       const char __user *buf,
255 					       size_t count)
256 {
257 	ssize_t ret;
258 	char kbuf[OCFS2_CONTROL_PROTO_LEN];
259 
260 	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
261 				buf, count);
262 	if (ret)
263 		return ret;
264 
265 	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
266 		return -EINVAL;
267 
268 	ocfs2_control_set_handshake_state(file,
269 					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
270 
271 	return count;
272 }
273 
274 static void ocfs2_control_send_down(const char *uuid,
275 				    int nodenum)
276 {
277 	struct ocfs2_live_connection *c;
278 
279 	mutex_lock(&ocfs2_control_lock);
280 
281 	c = ocfs2_connection_find(uuid);
282 	if (c) {
283 		BUG_ON(c->oc_conn == NULL);
284 		c->oc_conn->cc_recovery_handler(nodenum,
285 						c->oc_conn->cc_recovery_data);
286 	}
287 
288 	mutex_unlock(&ocfs2_control_lock);
289 }
290 
291 /*
292  * Called whenever configuration elements are sent to /dev/ocfs2_control.
293  * If all configuration elements are present, try to set the global
294  * values.  If there is a problem, return an error.  Skip any missing
295  * elements, and only bump ocfs2_control_opened when we have all elements
296  * and are successful.
297  */
298 static int ocfs2_control_install_private(struct file *file)
299 {
300 	int rc = 0;
301 	int set_p = 1;
302 	struct ocfs2_control_private *p = file->private_data;
303 
304 	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
305 
306 	mutex_lock(&ocfs2_control_lock);
307 
308 	if (p->op_this_node < 0) {
309 		set_p = 0;
310 	} else if ((ocfs2_control_this_node >= 0) &&
311 		   (ocfs2_control_this_node != p->op_this_node)) {
312 		rc = -EINVAL;
313 		goto out_unlock;
314 	}
315 
316 	if (!p->op_proto.pv_major) {
317 		set_p = 0;
318 	} else if (!list_empty(&ocfs2_live_connection_list) &&
319 		   ((running_proto.pv_major != p->op_proto.pv_major) ||
320 		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
321 		rc = -EINVAL;
322 		goto out_unlock;
323 	}
324 
325 	if (set_p) {
326 		ocfs2_control_this_node = p->op_this_node;
327 		running_proto.pv_major = p->op_proto.pv_major;
328 		running_proto.pv_minor = p->op_proto.pv_minor;
329 	}
330 
331 out_unlock:
332 	mutex_unlock(&ocfs2_control_lock);
333 
334 	if (!rc && set_p) {
335 		/* We set the global values successfully */
336 		atomic_inc(&ocfs2_control_opened);
337 		ocfs2_control_set_handshake_state(file,
338 					OCFS2_CONTROL_HANDSHAKE_VALID);
339 	}
340 
341 	return rc;
342 }
343 
344 static int ocfs2_control_get_this_node(void)
345 {
346 	int rc;
347 
348 	mutex_lock(&ocfs2_control_lock);
349 	if (ocfs2_control_this_node < 0)
350 		rc = -EINVAL;
351 	else
352 		rc = ocfs2_control_this_node;
353 	mutex_unlock(&ocfs2_control_lock);
354 
355 	return rc;
356 }
357 
358 static int ocfs2_control_do_setnode_msg(struct file *file,
359 					struct ocfs2_control_message_setn *msg)
360 {
361 	long nodenum;
362 	char *ptr = NULL;
363 	struct ocfs2_control_private *p = file->private_data;
364 
365 	if (ocfs2_control_get_handshake_state(file) !=
366 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
367 		return -EINVAL;
368 
369 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
370 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
371 		return -EINVAL;
372 
373 	if ((msg->space != ' ') || (msg->newline != '\n'))
374 		return -EINVAL;
375 	msg->space = msg->newline = '\0';
376 
377 	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
378 	if (!ptr || *ptr)
379 		return -EINVAL;
380 
381 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
382 	    (nodenum > INT_MAX) || (nodenum < 0))
383 		return -ERANGE;
384 	p->op_this_node = nodenum;
385 
386 	return ocfs2_control_install_private(file);
387 }
388 
389 static int ocfs2_control_do_setversion_msg(struct file *file,
390 					   struct ocfs2_control_message_setv *msg)
391 {
392 	long major, minor;
393 	char *ptr = NULL;
394 	struct ocfs2_control_private *p = file->private_data;
395 	struct ocfs2_protocol_version *max =
396 		&ocfs2_user_plugin.sp_max_proto;
397 
398 	if (ocfs2_control_get_handshake_state(file) !=
399 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
400 		return -EINVAL;
401 
402 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
403 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
404 		return -EINVAL;
405 
406 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
407 	    (msg->newline != '\n'))
408 		return -EINVAL;
409 	msg->space1 = msg->space2 = msg->newline = '\0';
410 
411 	major = simple_strtol(msg->major, &ptr, 16);
412 	if (!ptr || *ptr)
413 		return -EINVAL;
414 	minor = simple_strtol(msg->minor, &ptr, 16);
415 	if (!ptr || *ptr)
416 		return -EINVAL;
417 
418 	/*
419 	 * The major must be between 1 and 255, inclusive.  The minor
420 	 * must be between 0 and 255, inclusive.  The version passed in
421 	 * must be within the maximum version supported by the filesystem.
422 	 */
423 	if ((major == LONG_MIN) || (major == LONG_MAX) ||
424 	    (major > (u8)-1) || (major < 1))
425 		return -ERANGE;
426 	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
427 	    (minor > (u8)-1) || (minor < 0))
428 		return -ERANGE;
429 	if ((major != max->pv_major) ||
430 	    (minor > max->pv_minor))
431 		return -EINVAL;
432 
433 	p->op_proto.pv_major = major;
434 	p->op_proto.pv_minor = minor;
435 
436 	return ocfs2_control_install_private(file);
437 }
438 
439 static int ocfs2_control_do_down_msg(struct file *file,
440 				     struct ocfs2_control_message_down *msg)
441 {
442 	long nodenum;
443 	char *p = NULL;
444 
445 	if (ocfs2_control_get_handshake_state(file) !=
446 	    OCFS2_CONTROL_HANDSHAKE_VALID)
447 		return -EINVAL;
448 
449 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
450 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
451 		return -EINVAL;
452 
453 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
454 	    (msg->newline != '\n'))
455 		return -EINVAL;
456 	msg->space1 = msg->space2 = msg->newline = '\0';
457 
458 	nodenum = simple_strtol(msg->nodestr, &p, 16);
459 	if (!p || *p)
460 		return -EINVAL;
461 
462 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
463 	    (nodenum > INT_MAX) || (nodenum < 0))
464 		return -ERANGE;
465 
466 	ocfs2_control_send_down(msg->uuid, nodenum);
467 
468 	return 0;
469 }
470 
471 static ssize_t ocfs2_control_message(struct file *file,
472 				     const char __user *buf,
473 				     size_t count)
474 {
475 	ssize_t ret;
476 	union ocfs2_control_message msg;
477 
478 	/* Try to catch padding issues */
479 	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
480 		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
481 
482 	memset(&msg, 0, sizeof(union ocfs2_control_message));
483 	ret = ocfs2_control_cfu(&msg, count, buf, count);
484 	if (ret)
485 		goto out;
486 
487 	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
488 	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
489 		     OCFS2_CONTROL_MESSAGE_OP_LEN))
490 		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
491 	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
492 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
493 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
494 		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
495 	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
496 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
497 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
498 		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
499 	else
500 		ret = -EINVAL;
501 
502 out:
503 	return ret ? ret : count;
504 }
505 
506 static ssize_t ocfs2_control_write(struct file *file,
507 				   const char __user *buf,
508 				   size_t count,
509 				   loff_t *ppos)
510 {
511 	ssize_t ret;
512 
513 	switch (ocfs2_control_get_handshake_state(file)) {
514 		case OCFS2_CONTROL_HANDSHAKE_INVALID:
515 			ret = -EINVAL;
516 			break;
517 
518 		case OCFS2_CONTROL_HANDSHAKE_READ:
519 			ret = ocfs2_control_validate_protocol(file, buf,
520 							      count);
521 			break;
522 
523 		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
524 		case OCFS2_CONTROL_HANDSHAKE_VALID:
525 			ret = ocfs2_control_message(file, buf, count);
526 			break;
527 
528 		default:
529 			BUG();
530 			ret = -EIO;
531 			break;
532 	}
533 
534 	return ret;
535 }
536 
537 /*
538  * This is a naive version.  If we ever have a new protocol, we'll expand
539  * it.  Probably using seq_file.
540  */
541 static ssize_t ocfs2_control_read(struct file *file,
542 				  char __user *buf,
543 				  size_t count,
544 				  loff_t *ppos)
545 {
546 	ssize_t ret;
547 
548 	ret = simple_read_from_buffer(buf, count, ppos,
549 			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
550 
551 	/* Have we read the whole protocol list? */
552 	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
553 		ocfs2_control_set_handshake_state(file,
554 						  OCFS2_CONTROL_HANDSHAKE_READ);
555 
556 	return ret;
557 }
558 
559 static int ocfs2_control_release(struct inode *inode, struct file *file)
560 {
561 	struct ocfs2_control_private *p = file->private_data;
562 
563 	mutex_lock(&ocfs2_control_lock);
564 
565 	if (ocfs2_control_get_handshake_state(file) !=
566 	    OCFS2_CONTROL_HANDSHAKE_VALID)
567 		goto out;
568 
569 	if (atomic_dec_and_test(&ocfs2_control_opened)) {
570 		if (!list_empty(&ocfs2_live_connection_list)) {
571 			/* XXX: Do bad things! */
572 			printk(KERN_ERR
573 			       "ocfs2: Unexpected release of ocfs2_control!\n"
574 			       "       Loss of cluster connection requires "
575 			       "an emergency restart!\n");
576 			emergency_restart();
577 		}
578 		/*
579 		 * Last valid close clears the node number and resets
580 		 * the locking protocol version
581 		 */
582 		ocfs2_control_this_node = -1;
583 		running_proto.pv_major = 0;
584 		running_proto.pv_minor = 0;
585 	}
586 
587 out:
588 	list_del_init(&p->op_list);
589 	file->private_data = NULL;
590 
591 	mutex_unlock(&ocfs2_control_lock);
592 
593 	kfree(p);
594 
595 	return 0;
596 }
597 
598 static int ocfs2_control_open(struct inode *inode, struct file *file)
599 {
600 	struct ocfs2_control_private *p;
601 
602 	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
603 	if (!p)
604 		return -ENOMEM;
605 	p->op_this_node = -1;
606 
607 	mutex_lock(&ocfs2_control_lock);
608 	file->private_data = p;
609 	list_add(&p->op_list, &ocfs2_control_private_list);
610 	mutex_unlock(&ocfs2_control_lock);
611 
612 	return 0;
613 }
614 
615 static const struct file_operations ocfs2_control_fops = {
616 	.open    = ocfs2_control_open,
617 	.release = ocfs2_control_release,
618 	.read    = ocfs2_control_read,
619 	.write   = ocfs2_control_write,
620 	.owner   = THIS_MODULE,
621 	.llseek  = default_llseek,
622 };
623 
624 static struct miscdevice ocfs2_control_device = {
625 	.minor		= MISC_DYNAMIC_MINOR,
626 	.name		= "ocfs2_control",
627 	.fops		= &ocfs2_control_fops,
628 };
629 
630 static int ocfs2_control_init(void)
631 {
632 	int rc;
633 
634 	atomic_set(&ocfs2_control_opened, 0);
635 
636 	rc = misc_register(&ocfs2_control_device);
637 	if (rc)
638 		printk(KERN_ERR
639 		       "ocfs2: Unable to register ocfs2_control device "
640 		       "(errno %d)\n",
641 		       -rc);
642 
643 	return rc;
644 }
645 
646 static void ocfs2_control_exit(void)
647 {
648 	misc_deregister(&ocfs2_control_device);
649 }
650 
651 static void fsdlm_lock_ast_wrapper(void *astarg)
652 {
653 	struct ocfs2_dlm_lksb *lksb = astarg;
654 	int status = lksb->lksb_fsdlm.sb_status;
655 
656 	/*
657 	 * For now we're punting on the issue of other non-standard errors
658 	 * where we can't tell if the unlock_ast or lock_ast should be called.
659 	 * The main "other error" that's possible is EINVAL which means the
660 	 * function was called with invalid args, which shouldn't be possible
661 	 * since the caller here is under our control.  Other non-standard
662 	 * errors probably fall into the same category, or otherwise are fatal
663 	 * which means we can't carry on anyway.
664 	 */
665 
666 	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
667 		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
668 	else
669 		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
670 }
671 
672 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
673 {
674 	struct ocfs2_dlm_lksb *lksb = astarg;
675 
676 	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
677 }
678 
679 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
680 			 int mode,
681 			 struct ocfs2_dlm_lksb *lksb,
682 			 u32 flags,
683 			 void *name,
684 			 unsigned int namelen)
685 {
686 	if (!lksb->lksb_fsdlm.sb_lvbptr)
687 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
688 					     sizeof(struct dlm_lksb);
689 
690 	return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
691 			flags|DLM_LKF_NODLCKWT, name, namelen, 0,
692 			fsdlm_lock_ast_wrapper, lksb,
693 			fsdlm_blocking_ast_wrapper);
694 }
695 
696 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
697 			   struct ocfs2_dlm_lksb *lksb,
698 			   u32 flags)
699 {
700 	return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
701 			  flags, &lksb->lksb_fsdlm, lksb);
702 }
703 
704 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
705 {
706 	return lksb->lksb_fsdlm.sb_status;
707 }
708 
709 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
710 {
711 	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
712 
713 	return !invalid;
714 }
715 
716 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
717 {
718 	if (!lksb->lksb_fsdlm.sb_lvbptr)
719 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
720 					     sizeof(struct dlm_lksb);
721 	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
722 }
723 
724 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
725 {
726 }
727 
728 static int user_plock(struct ocfs2_cluster_connection *conn,
729 		      u64 ino,
730 		      struct file *file,
731 		      int cmd,
732 		      struct file_lock *fl)
733 {
734 	/*
735 	 * This more or less just demuxes the plock request into any
736 	 * one of three dlm calls.
737 	 *
738 	 * Internally, fs/dlm will pass these to a misc device, which
739 	 * a userspace daemon will read and write to.
740 	 *
741 	 * For now, cancel requests (which happen internally only),
742 	 * are turned into unlocks. Most of this function taken from
743 	 * gfs2_lock.
744 	 */
745 
746 	if (cmd == F_CANCELLK) {
747 		cmd = F_SETLK;
748 		fl->fl_type = F_UNLCK;
749 	}
750 
751 	if (IS_GETLK(cmd))
752 		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
753 	else if (fl->fl_type == F_UNLCK)
754 		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
755 	else
756 		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
757 }
758 
759 /*
760  * Compare a requested locking protocol version against the current one.
761  *
762  * If the major numbers are different, they are incompatible.
763  * If the current minor is greater than the request, they are incompatible.
764  * If the current minor is less than or equal to the request, they are
765  * compatible, and the requester should run at the current minor version.
766  */
767 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
768 			       struct ocfs2_protocol_version *request)
769 {
770 	if (existing->pv_major != request->pv_major)
771 		return 1;
772 
773 	if (existing->pv_minor > request->pv_minor)
774 		return 1;
775 
776 	if (existing->pv_minor < request->pv_minor)
777 		request->pv_minor = existing->pv_minor;
778 
779 	return 0;
780 }
781 
782 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
783 {
784 	struct ocfs2_protocol_version *pv =
785 		(struct ocfs2_protocol_version *)lvb;
786 	/*
787 	 * ocfs2_protocol_version has two u8 variables, so we don't
788 	 * need any endian conversion.
789 	 */
790 	ver->pv_major = pv->pv_major;
791 	ver->pv_minor = pv->pv_minor;
792 }
793 
794 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
795 {
796 	struct ocfs2_protocol_version *pv =
797 		(struct ocfs2_protocol_version *)lvb;
798 	/*
799 	 * ocfs2_protocol_version has two u8 variables, so we don't
800 	 * need any endian conversion.
801 	 */
802 	pv->pv_major = ver->pv_major;
803 	pv->pv_minor = ver->pv_minor;
804 }
805 
806 static void sync_wait_cb(void *arg)
807 {
808 	struct ocfs2_cluster_connection *conn = arg;
809 	struct ocfs2_live_connection *lc = conn->cc_private;
810 	complete(&lc->oc_sync_wait);
811 }
812 
813 static int sync_unlock(struct ocfs2_cluster_connection *conn,
814 		struct dlm_lksb *lksb, char *name)
815 {
816 	int error;
817 	struct ocfs2_live_connection *lc = conn->cc_private;
818 
819 	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
820 	if (error) {
821 		printk(KERN_ERR "%s lkid %x error %d\n",
822 				name, lksb->sb_lkid, error);
823 		return error;
824 	}
825 
826 	wait_for_completion(&lc->oc_sync_wait);
827 
828 	if (lksb->sb_status != -DLM_EUNLOCK) {
829 		printk(KERN_ERR "%s lkid %x status %d\n",
830 				name, lksb->sb_lkid, lksb->sb_status);
831 		return -1;
832 	}
833 	return 0;
834 }
835 
836 static int sync_lock(struct ocfs2_cluster_connection *conn,
837 		int mode, uint32_t flags,
838 		struct dlm_lksb *lksb, char *name)
839 {
840 	int error, status;
841 	struct ocfs2_live_connection *lc = conn->cc_private;
842 
843 	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
844 			name, strlen(name),
845 			0, sync_wait_cb, conn, NULL);
846 	if (error) {
847 		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
848 				name, lksb->sb_lkid, flags, mode, error);
849 		return error;
850 	}
851 
852 	wait_for_completion(&lc->oc_sync_wait);
853 
854 	status = lksb->sb_status;
855 
856 	if (status && status != -EAGAIN) {
857 		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
858 				name, lksb->sb_lkid, flags, mode, status);
859 	}
860 
861 	return status;
862 }
863 
864 
865 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
866 		int flags)
867 {
868 	struct ocfs2_live_connection *lc = conn->cc_private;
869 	return sync_lock(conn, mode, flags,
870 			&lc->oc_version_lksb, VERSION_LOCK);
871 }
872 
873 static int version_unlock(struct ocfs2_cluster_connection *conn)
874 {
875 	struct ocfs2_live_connection *lc = conn->cc_private;
876 	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
877 }
878 
879 /* get_protocol_version()
880  *
881  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
882  * The algorithm is:
883  * 1. Attempt to take the lock in EX mode (non-blocking).
884  * 2. If successful (which means it is the first mount), write the
885  *    version number and downconvert to PR lock.
886  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
887  *    taking the PR lock.
888  */
889 
890 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
891 {
892 	int ret;
893 	struct ocfs2_live_connection *lc = conn->cc_private;
894 	struct ocfs2_protocol_version pv;
895 
896 	running_proto.pv_major =
897 		ocfs2_user_plugin.sp_max_proto.pv_major;
898 	running_proto.pv_minor =
899 		ocfs2_user_plugin.sp_max_proto.pv_minor;
900 
901 	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
902 	ret = version_lock(conn, DLM_LOCK_EX,
903 			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
904 	if (!ret) {
905 		conn->cc_version.pv_major = running_proto.pv_major;
906 		conn->cc_version.pv_minor = running_proto.pv_minor;
907 		version_to_lvb(&running_proto, lc->oc_lvb);
908 		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
909 	} else if (ret == -EAGAIN) {
910 		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
911 		if (ret)
912 			goto out;
913 		lvb_to_version(lc->oc_lvb, &pv);
914 
915 		if ((pv.pv_major != running_proto.pv_major) ||
916 				(pv.pv_minor > running_proto.pv_minor)) {
917 			ret = -EINVAL;
918 			goto out;
919 		}
920 
921 		conn->cc_version.pv_major = pv.pv_major;
922 		conn->cc_version.pv_minor = pv.pv_minor;
923 	}
924 out:
925 	return ret;
926 }
927 
928 static void user_recover_prep(void *arg)
929 {
930 }
931 
932 static void user_recover_slot(void *arg, struct dlm_slot *slot)
933 {
934 	struct ocfs2_cluster_connection *conn = arg;
935 	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
936 			slot->nodeid, slot->slot);
937 	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
938 
939 }
940 
941 static void user_recover_done(void *arg, struct dlm_slot *slots,
942 		int num_slots, int our_slot,
943 		uint32_t generation)
944 {
945 	struct ocfs2_cluster_connection *conn = arg;
946 	struct ocfs2_live_connection *lc = conn->cc_private;
947 	int i;
948 
949 	for (i = 0; i < num_slots; i++)
950 		if (slots[i].slot == our_slot) {
951 			atomic_set(&lc->oc_this_node, slots[i].nodeid);
952 			break;
953 		}
954 
955 	lc->oc_our_slot = our_slot;
956 	wake_up(&lc->oc_wait);
957 }
958 
959 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
960 	.recover_prep = user_recover_prep,
961 	.recover_slot = user_recover_slot,
962 	.recover_done = user_recover_done,
963 };
964 
965 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
966 {
967 	version_unlock(conn);
968 	dlm_release_lockspace(conn->cc_lockspace, 2);
969 	conn->cc_lockspace = NULL;
970 	ocfs2_live_connection_drop(conn->cc_private);
971 	conn->cc_private = NULL;
972 	return 0;
973 }
974 
975 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
976 {
977 	dlm_lockspace_t *fsdlm;
978 	struct ocfs2_live_connection *lc;
979 	int rc, ops_rv;
980 
981 	BUG_ON(conn == NULL);
982 
983 	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
984 	if (!lc)
985 		return -ENOMEM;
986 
987 	init_waitqueue_head(&lc->oc_wait);
988 	init_completion(&lc->oc_sync_wait);
989 	atomic_set(&lc->oc_this_node, 0);
990 	conn->cc_private = lc;
991 	lc->oc_type = NO_CONTROLD;
992 
993 	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
994 			       DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
995 			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
996 	if (rc) {
997 		if (rc == -EEXIST || rc == -EPROTO)
998 			printk(KERN_ERR "ocfs2: Unable to create the "
999 				"lockspace %s (%d), because a ocfs2-tools "
1000 				"program is running on this file system "
1001 				"with the same name lockspace\n",
1002 				conn->cc_name, rc);
1003 		goto out;
1004 	}
1005 
1006 	if (ops_rv == -EOPNOTSUPP) {
1007 		lc->oc_type = WITH_CONTROLD;
1008 		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1009 				"version of dlm_controld and/or ocfs2-tools."
1010 				" Please consider upgrading.\n");
1011 	} else if (ops_rv) {
1012 		rc = ops_rv;
1013 		goto out;
1014 	}
1015 	conn->cc_lockspace = fsdlm;
1016 
1017 	rc = ocfs2_live_connection_attach(conn, lc);
1018 	if (rc)
1019 		goto out;
1020 
1021 	if (lc->oc_type == NO_CONTROLD) {
1022 		rc = get_protocol_version(conn);
1023 		if (rc) {
1024 			printk(KERN_ERR "ocfs2: Could not determine"
1025 					" locking version\n");
1026 			user_cluster_disconnect(conn);
1027 			goto out;
1028 		}
1029 		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1030 	}
1031 
1032 	/*
1033 	 * running_proto must have been set before we allowed any mounts
1034 	 * to proceed.
1035 	 */
1036 	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1037 		printk(KERN_ERR
1038 		       "Unable to mount with fs locking protocol version "
1039 		       "%u.%u because negotiated protocol is %u.%u\n",
1040 		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
1041 		       running_proto.pv_major, running_proto.pv_minor);
1042 		rc = -EPROTO;
1043 		ocfs2_live_connection_drop(lc);
1044 		lc = NULL;
1045 	}
1046 
1047 out:
1048 	if (rc)
1049 		kfree(lc);
1050 	return rc;
1051 }
1052 
1053 
1054 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1055 				  unsigned int *this_node)
1056 {
1057 	int rc;
1058 	struct ocfs2_live_connection *lc = conn->cc_private;
1059 
1060 	if (lc->oc_type == WITH_CONTROLD)
1061 		rc = ocfs2_control_get_this_node();
1062 	else if (lc->oc_type == NO_CONTROLD)
1063 		rc = atomic_read(&lc->oc_this_node);
1064 	else
1065 		rc = -EINVAL;
1066 
1067 	if (rc < 0)
1068 		return rc;
1069 
1070 	*this_node = rc;
1071 	return 0;
1072 }
1073 
1074 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1075 	.connect	= user_cluster_connect,
1076 	.disconnect	= user_cluster_disconnect,
1077 	.this_node	= user_cluster_this_node,
1078 	.dlm_lock	= user_dlm_lock,
1079 	.dlm_unlock	= user_dlm_unlock,
1080 	.lock_status	= user_dlm_lock_status,
1081 	.lvb_valid	= user_dlm_lvb_valid,
1082 	.lock_lvb	= user_dlm_lvb,
1083 	.plock		= user_plock,
1084 	.dump_lksb	= user_dlm_dump_lksb,
1085 };
1086 
1087 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1088 	.sp_name	= "user",
1089 	.sp_ops		= &ocfs2_user_plugin_ops,
1090 	.sp_owner	= THIS_MODULE,
1091 };
1092 
1093 
1094 static int __init ocfs2_user_plugin_init(void)
1095 {
1096 	int rc;
1097 
1098 	rc = ocfs2_control_init();
1099 	if (!rc) {
1100 		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1101 		if (rc)
1102 			ocfs2_control_exit();
1103 	}
1104 
1105 	return rc;
1106 }
1107 
1108 static void __exit ocfs2_user_plugin_exit(void)
1109 {
1110 	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1111 	ocfs2_control_exit();
1112 }
1113 
1114 MODULE_AUTHOR("Oracle");
1115 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1116 MODULE_LICENSE("GPL");
1117 module_init(ocfs2_user_plugin_init);
1118 module_exit(ocfs2_user_plugin_exit);
1119