xref: /linux/fs/ocfs2/stack_user.c (revision 8fc4e4aa2bfca8d32e8bc2a01526ea2da450e6cb)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * stack_user.c
4  *
5  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
6  *
7  * Copyright (C) 2007 Oracle.  All rights reserved.
8  */
9 
10 #include <linux/module.h>
11 #include <linux/fs.h>
12 #include <linux/miscdevice.h>
13 #include <linux/mutex.h>
14 #include <linux/slab.h>
15 #include <linux/reboot.h>
16 #include <linux/sched.h>
17 #include <linux/uaccess.h>
18 
19 #include "stackglue.h"
20 
21 #include <linux/dlm_plock.h>
22 
23 /*
24  * The control protocol starts with a handshake.  Until the handshake
25  * is complete, the control device will fail all write(2)s.
26  *
27  * The handshake is simple.  First, the client reads until EOF.  Each line
28  * of output is a supported protocol tag.  All protocol tags are a single
29  * character followed by a two hex digit version number.  Currently the
30  * only things supported is T01, for "Text-base version 0x01".  Next, the
31  * client writes the version they would like to use, including the newline.
32  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
33  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
34  * client can start sending messages.
35  *
36  * The T01 protocol has three messages.  First is the "SETN" message.
37  * It has the following syntax:
38  *
39  *  SETN<space><8-char-hex-nodenum><newline>
40  *
41  * This is 14 characters.
42  *
43  * The "SETN" message must be the first message following the protocol.
44  * It tells ocfs2_control the local node number.
45  *
46  * Next comes the "SETV" message.  It has the following syntax:
47  *
48  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
49  *
50  * This is 11 characters.
51  *
52  * The "SETV" message sets the filesystem locking protocol version as
53  * negotiated by the client.  The client negotiates based on the maximum
54  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
55  * number from the "SETV" message must match
56  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
57  * must be less than or equal to ...sp_max_version.pv_minor.
58  *
59  * Once this information has been set, mounts will be allowed.  From this
60  * point on, the "DOWN" message can be sent for node down notification.
61  * It has the following syntax:
62  *
63  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
64  *
65  * eg:
66  *
67  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
68  *
69  * This is 47 characters.
70  */
71 
72 /*
73  * Whether or not the client has done the handshake.
74  * For now, we have just one protocol version.
75  */
76 #define OCFS2_CONTROL_PROTO			"T01\n"
77 #define OCFS2_CONTROL_PROTO_LEN			4
78 
79 /* Handshake states */
80 #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
81 #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
82 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
83 #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
84 
85 /* Messages */
86 #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
87 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
88 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
89 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
91 #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
92 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
93 #define OCFS2_TEXT_UUID_LEN			32
94 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
95 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
96 #define VERSION_LOCK				"version_lock"
97 
98 enum ocfs2_connection_type {
99 	WITH_CONTROLD,
100 	NO_CONTROLD
101 };
102 
103 /*
104  * ocfs2_live_connection is refcounted because the filesystem and
105  * miscdevice sides can detach in different order.  Let's just be safe.
106  */
107 struct ocfs2_live_connection {
108 	struct list_head		oc_list;
109 	struct ocfs2_cluster_connection	*oc_conn;
110 	enum ocfs2_connection_type	oc_type;
111 	atomic_t                        oc_this_node;
112 	int                             oc_our_slot;
113 	struct dlm_lksb                 oc_version_lksb;
114 	char                            oc_lvb[DLM_LVB_LEN];
115 	struct completion               oc_sync_wait;
116 	wait_queue_head_t		oc_wait;
117 };
118 
119 struct ocfs2_control_private {
120 	struct list_head op_list;
121 	int op_state;
122 	int op_this_node;
123 	struct ocfs2_protocol_version op_proto;
124 };
125 
126 /* SETN<space><8-char-hex-nodenum><newline> */
127 struct ocfs2_control_message_setn {
128 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
129 	char	space;
130 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
131 	char	newline;
132 };
133 
134 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
135 struct ocfs2_control_message_setv {
136 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
137 	char	space1;
138 	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
139 	char	space2;
140 	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
141 	char	newline;
142 };
143 
144 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
145 struct ocfs2_control_message_down {
146 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
147 	char	space1;
148 	char	uuid[OCFS2_TEXT_UUID_LEN];
149 	char	space2;
150 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
151 	char	newline;
152 };
153 
154 union ocfs2_control_message {
155 	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
156 	struct ocfs2_control_message_setn	u_setn;
157 	struct ocfs2_control_message_setv	u_setv;
158 	struct ocfs2_control_message_down	u_down;
159 };
160 
161 static struct ocfs2_stack_plugin ocfs2_user_plugin;
162 
163 static atomic_t ocfs2_control_opened;
164 static int ocfs2_control_this_node = -1;
165 static struct ocfs2_protocol_version running_proto;
166 
167 static LIST_HEAD(ocfs2_live_connection_list);
168 static LIST_HEAD(ocfs2_control_private_list);
169 static DEFINE_MUTEX(ocfs2_control_lock);
170 
171 static inline void ocfs2_control_set_handshake_state(struct file *file,
172 						     int state)
173 {
174 	struct ocfs2_control_private *p = file->private_data;
175 	p->op_state = state;
176 }
177 
178 static inline int ocfs2_control_get_handshake_state(struct file *file)
179 {
180 	struct ocfs2_control_private *p = file->private_data;
181 	return p->op_state;
182 }
183 
184 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
185 {
186 	size_t len = strlen(name);
187 	struct ocfs2_live_connection *c;
188 
189 	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
190 
191 	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
192 		if ((c->oc_conn->cc_namelen == len) &&
193 		    !strncmp(c->oc_conn->cc_name, name, len))
194 			return c;
195 	}
196 
197 	return NULL;
198 }
199 
200 /*
201  * ocfs2_live_connection structures are created underneath the ocfs2
202  * mount path.  Since the VFS prevents multiple calls to
203  * fill_super(), we can't get dupes here.
204  */
205 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
206 				     struct ocfs2_live_connection *c)
207 {
208 	int rc = 0;
209 
210 	mutex_lock(&ocfs2_control_lock);
211 	c->oc_conn = conn;
212 
213 	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
214 		list_add(&c->oc_list, &ocfs2_live_connection_list);
215 	else {
216 		printk(KERN_ERR
217 		       "ocfs2: Userspace control daemon is not present\n");
218 		rc = -ESRCH;
219 	}
220 
221 	mutex_unlock(&ocfs2_control_lock);
222 	return rc;
223 }
224 
225 /*
226  * This function disconnects the cluster connection from ocfs2_control.
227  * Afterwards, userspace can't affect the cluster connection.
228  */
229 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
230 {
231 	mutex_lock(&ocfs2_control_lock);
232 	list_del_init(&c->oc_list);
233 	c->oc_conn = NULL;
234 	mutex_unlock(&ocfs2_control_lock);
235 
236 	kfree(c);
237 }
238 
239 static int ocfs2_control_cfu(void *target, size_t target_len,
240 			     const char __user *buf, size_t count)
241 {
242 	/* The T01 expects write(2) calls to have exactly one command */
243 	if ((count != target_len) ||
244 	    (count > sizeof(union ocfs2_control_message)))
245 		return -EINVAL;
246 
247 	if (copy_from_user(target, buf, target_len))
248 		return -EFAULT;
249 
250 	return 0;
251 }
252 
253 static ssize_t ocfs2_control_validate_protocol(struct file *file,
254 					       const char __user *buf,
255 					       size_t count)
256 {
257 	ssize_t ret;
258 	char kbuf[OCFS2_CONTROL_PROTO_LEN];
259 
260 	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
261 				buf, count);
262 	if (ret)
263 		return ret;
264 
265 	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
266 		return -EINVAL;
267 
268 	ocfs2_control_set_handshake_state(file,
269 					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
270 
271 	return count;
272 }
273 
274 static void ocfs2_control_send_down(const char *uuid,
275 				    int nodenum)
276 {
277 	struct ocfs2_live_connection *c;
278 
279 	mutex_lock(&ocfs2_control_lock);
280 
281 	c = ocfs2_connection_find(uuid);
282 	if (c) {
283 		BUG_ON(c->oc_conn == NULL);
284 		c->oc_conn->cc_recovery_handler(nodenum,
285 						c->oc_conn->cc_recovery_data);
286 	}
287 
288 	mutex_unlock(&ocfs2_control_lock);
289 }
290 
291 /*
292  * Called whenever configuration elements are sent to /dev/ocfs2_control.
293  * If all configuration elements are present, try to set the global
294  * values.  If there is a problem, return an error.  Skip any missing
295  * elements, and only bump ocfs2_control_opened when we have all elements
296  * and are successful.
297  */
298 static int ocfs2_control_install_private(struct file *file)
299 {
300 	int rc = 0;
301 	int set_p = 1;
302 	struct ocfs2_control_private *p = file->private_data;
303 
304 	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
305 
306 	mutex_lock(&ocfs2_control_lock);
307 
308 	if (p->op_this_node < 0) {
309 		set_p = 0;
310 	} else if ((ocfs2_control_this_node >= 0) &&
311 		   (ocfs2_control_this_node != p->op_this_node)) {
312 		rc = -EINVAL;
313 		goto out_unlock;
314 	}
315 
316 	if (!p->op_proto.pv_major) {
317 		set_p = 0;
318 	} else if (!list_empty(&ocfs2_live_connection_list) &&
319 		   ((running_proto.pv_major != p->op_proto.pv_major) ||
320 		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
321 		rc = -EINVAL;
322 		goto out_unlock;
323 	}
324 
325 	if (set_p) {
326 		ocfs2_control_this_node = p->op_this_node;
327 		running_proto.pv_major = p->op_proto.pv_major;
328 		running_proto.pv_minor = p->op_proto.pv_minor;
329 	}
330 
331 out_unlock:
332 	mutex_unlock(&ocfs2_control_lock);
333 
334 	if (!rc && set_p) {
335 		/* We set the global values successfully */
336 		atomic_inc(&ocfs2_control_opened);
337 		ocfs2_control_set_handshake_state(file,
338 					OCFS2_CONTROL_HANDSHAKE_VALID);
339 	}
340 
341 	return rc;
342 }
343 
344 static int ocfs2_control_get_this_node(void)
345 {
346 	int rc;
347 
348 	mutex_lock(&ocfs2_control_lock);
349 	if (ocfs2_control_this_node < 0)
350 		rc = -EINVAL;
351 	else
352 		rc = ocfs2_control_this_node;
353 	mutex_unlock(&ocfs2_control_lock);
354 
355 	return rc;
356 }
357 
358 static int ocfs2_control_do_setnode_msg(struct file *file,
359 					struct ocfs2_control_message_setn *msg)
360 {
361 	long nodenum;
362 	char *ptr = NULL;
363 	struct ocfs2_control_private *p = file->private_data;
364 
365 	if (ocfs2_control_get_handshake_state(file) !=
366 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
367 		return -EINVAL;
368 
369 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
370 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
371 		return -EINVAL;
372 
373 	if ((msg->space != ' ') || (msg->newline != '\n'))
374 		return -EINVAL;
375 	msg->space = msg->newline = '\0';
376 
377 	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
378 	if (!ptr || *ptr)
379 		return -EINVAL;
380 
381 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
382 	    (nodenum > INT_MAX) || (nodenum < 0))
383 		return -ERANGE;
384 	p->op_this_node = nodenum;
385 
386 	return ocfs2_control_install_private(file);
387 }
388 
389 static int ocfs2_control_do_setversion_msg(struct file *file,
390 					   struct ocfs2_control_message_setv *msg)
391 {
392 	long major, minor;
393 	char *ptr = NULL;
394 	struct ocfs2_control_private *p = file->private_data;
395 	struct ocfs2_protocol_version *max =
396 		&ocfs2_user_plugin.sp_max_proto;
397 
398 	if (ocfs2_control_get_handshake_state(file) !=
399 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
400 		return -EINVAL;
401 
402 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
403 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
404 		return -EINVAL;
405 
406 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
407 	    (msg->newline != '\n'))
408 		return -EINVAL;
409 	msg->space1 = msg->space2 = msg->newline = '\0';
410 
411 	major = simple_strtol(msg->major, &ptr, 16);
412 	if (!ptr || *ptr)
413 		return -EINVAL;
414 	minor = simple_strtol(msg->minor, &ptr, 16);
415 	if (!ptr || *ptr)
416 		return -EINVAL;
417 
418 	/*
419 	 * The major must be between 1 and 255, inclusive.  The minor
420 	 * must be between 0 and 255, inclusive.  The version passed in
421 	 * must be within the maximum version supported by the filesystem.
422 	 */
423 	if ((major == LONG_MIN) || (major == LONG_MAX) ||
424 	    (major > (u8)-1) || (major < 1))
425 		return -ERANGE;
426 	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
427 	    (minor > (u8)-1) || (minor < 0))
428 		return -ERANGE;
429 	if ((major != max->pv_major) ||
430 	    (minor > max->pv_minor))
431 		return -EINVAL;
432 
433 	p->op_proto.pv_major = major;
434 	p->op_proto.pv_minor = minor;
435 
436 	return ocfs2_control_install_private(file);
437 }
438 
439 static int ocfs2_control_do_down_msg(struct file *file,
440 				     struct ocfs2_control_message_down *msg)
441 {
442 	long nodenum;
443 	char *p = NULL;
444 
445 	if (ocfs2_control_get_handshake_state(file) !=
446 	    OCFS2_CONTROL_HANDSHAKE_VALID)
447 		return -EINVAL;
448 
449 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
450 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
451 		return -EINVAL;
452 
453 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
454 	    (msg->newline != '\n'))
455 		return -EINVAL;
456 	msg->space1 = msg->space2 = msg->newline = '\0';
457 
458 	nodenum = simple_strtol(msg->nodestr, &p, 16);
459 	if (!p || *p)
460 		return -EINVAL;
461 
462 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
463 	    (nodenum > INT_MAX) || (nodenum < 0))
464 		return -ERANGE;
465 
466 	ocfs2_control_send_down(msg->uuid, nodenum);
467 
468 	return 0;
469 }
470 
471 static ssize_t ocfs2_control_message(struct file *file,
472 				     const char __user *buf,
473 				     size_t count)
474 {
475 	ssize_t ret;
476 	union ocfs2_control_message msg;
477 
478 	/* Try to catch padding issues */
479 	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
480 		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
481 
482 	memset(&msg, 0, sizeof(union ocfs2_control_message));
483 	ret = ocfs2_control_cfu(&msg, count, buf, count);
484 	if (ret)
485 		goto out;
486 
487 	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
488 	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
489 		     OCFS2_CONTROL_MESSAGE_OP_LEN))
490 		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
491 	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
492 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
493 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
494 		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
495 	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
496 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
497 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
498 		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
499 	else
500 		ret = -EINVAL;
501 
502 out:
503 	return ret ? ret : count;
504 }
505 
506 static ssize_t ocfs2_control_write(struct file *file,
507 				   const char __user *buf,
508 				   size_t count,
509 				   loff_t *ppos)
510 {
511 	ssize_t ret;
512 
513 	switch (ocfs2_control_get_handshake_state(file)) {
514 		case OCFS2_CONTROL_HANDSHAKE_INVALID:
515 			ret = -EINVAL;
516 			break;
517 
518 		case OCFS2_CONTROL_HANDSHAKE_READ:
519 			ret = ocfs2_control_validate_protocol(file, buf,
520 							      count);
521 			break;
522 
523 		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
524 		case OCFS2_CONTROL_HANDSHAKE_VALID:
525 			ret = ocfs2_control_message(file, buf, count);
526 			break;
527 
528 		default:
529 			BUG();
530 			ret = -EIO;
531 			break;
532 	}
533 
534 	return ret;
535 }
536 
537 /*
538  * This is a naive version.  If we ever have a new protocol, we'll expand
539  * it.  Probably using seq_file.
540  */
541 static ssize_t ocfs2_control_read(struct file *file,
542 				  char __user *buf,
543 				  size_t count,
544 				  loff_t *ppos)
545 {
546 	ssize_t ret;
547 
548 	ret = simple_read_from_buffer(buf, count, ppos,
549 			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
550 
551 	/* Have we read the whole protocol list? */
552 	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
553 		ocfs2_control_set_handshake_state(file,
554 						  OCFS2_CONTROL_HANDSHAKE_READ);
555 
556 	return ret;
557 }
558 
559 static int ocfs2_control_release(struct inode *inode, struct file *file)
560 {
561 	struct ocfs2_control_private *p = file->private_data;
562 
563 	mutex_lock(&ocfs2_control_lock);
564 
565 	if (ocfs2_control_get_handshake_state(file) !=
566 	    OCFS2_CONTROL_HANDSHAKE_VALID)
567 		goto out;
568 
569 	if (atomic_dec_and_test(&ocfs2_control_opened)) {
570 		if (!list_empty(&ocfs2_live_connection_list)) {
571 			/* XXX: Do bad things! */
572 			printk(KERN_ERR
573 			       "ocfs2: Unexpected release of ocfs2_control!\n"
574 			       "       Loss of cluster connection requires "
575 			       "an emergency restart!\n");
576 			emergency_restart();
577 		}
578 		/*
579 		 * Last valid close clears the node number and resets
580 		 * the locking protocol version
581 		 */
582 		ocfs2_control_this_node = -1;
583 		running_proto.pv_major = 0;
584 		running_proto.pv_minor = 0;
585 	}
586 
587 out:
588 	list_del_init(&p->op_list);
589 	file->private_data = NULL;
590 
591 	mutex_unlock(&ocfs2_control_lock);
592 
593 	kfree(p);
594 
595 	return 0;
596 }
597 
598 static int ocfs2_control_open(struct inode *inode, struct file *file)
599 {
600 	struct ocfs2_control_private *p;
601 
602 	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
603 	if (!p)
604 		return -ENOMEM;
605 	p->op_this_node = -1;
606 
607 	mutex_lock(&ocfs2_control_lock);
608 	file->private_data = p;
609 	list_add(&p->op_list, &ocfs2_control_private_list);
610 	mutex_unlock(&ocfs2_control_lock);
611 
612 	return 0;
613 }
614 
615 static const struct file_operations ocfs2_control_fops = {
616 	.open    = ocfs2_control_open,
617 	.release = ocfs2_control_release,
618 	.read    = ocfs2_control_read,
619 	.write   = ocfs2_control_write,
620 	.owner   = THIS_MODULE,
621 	.llseek  = default_llseek,
622 };
623 
624 static struct miscdevice ocfs2_control_device = {
625 	.minor		= MISC_DYNAMIC_MINOR,
626 	.name		= "ocfs2_control",
627 	.fops		= &ocfs2_control_fops,
628 };
629 
630 static int ocfs2_control_init(void)
631 {
632 	int rc;
633 
634 	atomic_set(&ocfs2_control_opened, 0);
635 
636 	rc = misc_register(&ocfs2_control_device);
637 	if (rc)
638 		printk(KERN_ERR
639 		       "ocfs2: Unable to register ocfs2_control device "
640 		       "(errno %d)\n",
641 		       -rc);
642 
643 	return rc;
644 }
645 
646 static void ocfs2_control_exit(void)
647 {
648 	misc_deregister(&ocfs2_control_device);
649 }
650 
651 static void fsdlm_lock_ast_wrapper(void *astarg)
652 {
653 	struct ocfs2_dlm_lksb *lksb = astarg;
654 	int status = lksb->lksb_fsdlm.sb_status;
655 
656 	/*
657 	 * For now we're punting on the issue of other non-standard errors
658 	 * where we can't tell if the unlock_ast or lock_ast should be called.
659 	 * The main "other error" that's possible is EINVAL which means the
660 	 * function was called with invalid args, which shouldn't be possible
661 	 * since the caller here is under our control.  Other non-standard
662 	 * errors probably fall into the same category, or otherwise are fatal
663 	 * which means we can't carry on anyway.
664 	 */
665 
666 	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
667 		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
668 	else
669 		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
670 }
671 
672 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
673 {
674 	struct ocfs2_dlm_lksb *lksb = astarg;
675 
676 	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
677 }
678 
679 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
680 			 int mode,
681 			 struct ocfs2_dlm_lksb *lksb,
682 			 u32 flags,
683 			 void *name,
684 			 unsigned int namelen)
685 {
686 	int ret;
687 
688 	if (!lksb->lksb_fsdlm.sb_lvbptr)
689 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
690 					     sizeof(struct dlm_lksb);
691 
692 	ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
693 		       flags|DLM_LKF_NODLCKWT, name, namelen, 0,
694 		       fsdlm_lock_ast_wrapper, lksb,
695 		       fsdlm_blocking_ast_wrapper);
696 	return ret;
697 }
698 
699 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
700 			   struct ocfs2_dlm_lksb *lksb,
701 			   u32 flags)
702 {
703 	int ret;
704 
705 	ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
706 			 flags, &lksb->lksb_fsdlm, lksb);
707 	return ret;
708 }
709 
710 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
711 {
712 	return lksb->lksb_fsdlm.sb_status;
713 }
714 
715 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
716 {
717 	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
718 
719 	return !invalid;
720 }
721 
722 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
723 {
724 	if (!lksb->lksb_fsdlm.sb_lvbptr)
725 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
726 					     sizeof(struct dlm_lksb);
727 	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
728 }
729 
730 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
731 {
732 }
733 
734 static int user_plock(struct ocfs2_cluster_connection *conn,
735 		      u64 ino,
736 		      struct file *file,
737 		      int cmd,
738 		      struct file_lock *fl)
739 {
740 	/*
741 	 * This more or less just demuxes the plock request into any
742 	 * one of three dlm calls.
743 	 *
744 	 * Internally, fs/dlm will pass these to a misc device, which
745 	 * a userspace daemon will read and write to.
746 	 *
747 	 * For now, cancel requests (which happen internally only),
748 	 * are turned into unlocks. Most of this function taken from
749 	 * gfs2_lock.
750 	 */
751 
752 	if (cmd == F_CANCELLK) {
753 		cmd = F_SETLK;
754 		fl->fl_type = F_UNLCK;
755 	}
756 
757 	if (IS_GETLK(cmd))
758 		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
759 	else if (fl->fl_type == F_UNLCK)
760 		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
761 	else
762 		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
763 }
764 
765 /*
766  * Compare a requested locking protocol version against the current one.
767  *
768  * If the major numbers are different, they are incompatible.
769  * If the current minor is greater than the request, they are incompatible.
770  * If the current minor is less than or equal to the request, they are
771  * compatible, and the requester should run at the current minor version.
772  */
773 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
774 			       struct ocfs2_protocol_version *request)
775 {
776 	if (existing->pv_major != request->pv_major)
777 		return 1;
778 
779 	if (existing->pv_minor > request->pv_minor)
780 		return 1;
781 
782 	if (existing->pv_minor < request->pv_minor)
783 		request->pv_minor = existing->pv_minor;
784 
785 	return 0;
786 }
787 
788 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
789 {
790 	struct ocfs2_protocol_version *pv =
791 		(struct ocfs2_protocol_version *)lvb;
792 	/*
793 	 * ocfs2_protocol_version has two u8 variables, so we don't
794 	 * need any endian conversion.
795 	 */
796 	ver->pv_major = pv->pv_major;
797 	ver->pv_minor = pv->pv_minor;
798 }
799 
800 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
801 {
802 	struct ocfs2_protocol_version *pv =
803 		(struct ocfs2_protocol_version *)lvb;
804 	/*
805 	 * ocfs2_protocol_version has two u8 variables, so we don't
806 	 * need any endian conversion.
807 	 */
808 	pv->pv_major = ver->pv_major;
809 	pv->pv_minor = ver->pv_minor;
810 }
811 
812 static void sync_wait_cb(void *arg)
813 {
814 	struct ocfs2_cluster_connection *conn = arg;
815 	struct ocfs2_live_connection *lc = conn->cc_private;
816 	complete(&lc->oc_sync_wait);
817 }
818 
819 static int sync_unlock(struct ocfs2_cluster_connection *conn,
820 		struct dlm_lksb *lksb, char *name)
821 {
822 	int error;
823 	struct ocfs2_live_connection *lc = conn->cc_private;
824 
825 	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
826 	if (error) {
827 		printk(KERN_ERR "%s lkid %x error %d\n",
828 				name, lksb->sb_lkid, error);
829 		return error;
830 	}
831 
832 	wait_for_completion(&lc->oc_sync_wait);
833 
834 	if (lksb->sb_status != -DLM_EUNLOCK) {
835 		printk(KERN_ERR "%s lkid %x status %d\n",
836 				name, lksb->sb_lkid, lksb->sb_status);
837 		return -1;
838 	}
839 	return 0;
840 }
841 
842 static int sync_lock(struct ocfs2_cluster_connection *conn,
843 		int mode, uint32_t flags,
844 		struct dlm_lksb *lksb, char *name)
845 {
846 	int error, status;
847 	struct ocfs2_live_connection *lc = conn->cc_private;
848 
849 	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
850 			name, strlen(name),
851 			0, sync_wait_cb, conn, NULL);
852 	if (error) {
853 		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
854 				name, lksb->sb_lkid, flags, mode, error);
855 		return error;
856 	}
857 
858 	wait_for_completion(&lc->oc_sync_wait);
859 
860 	status = lksb->sb_status;
861 
862 	if (status && status != -EAGAIN) {
863 		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
864 				name, lksb->sb_lkid, flags, mode, status);
865 	}
866 
867 	return status;
868 }
869 
870 
871 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
872 		int flags)
873 {
874 	struct ocfs2_live_connection *lc = conn->cc_private;
875 	return sync_lock(conn, mode, flags,
876 			&lc->oc_version_lksb, VERSION_LOCK);
877 }
878 
879 static int version_unlock(struct ocfs2_cluster_connection *conn)
880 {
881 	struct ocfs2_live_connection *lc = conn->cc_private;
882 	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
883 }
884 
885 /* get_protocol_version()
886  *
887  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
888  * The algorithm is:
889  * 1. Attempt to take the lock in EX mode (non-blocking).
890  * 2. If successful (which means it is the first mount), write the
891  *    version number and downconvert to PR lock.
892  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
893  *    taking the PR lock.
894  */
895 
896 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
897 {
898 	int ret;
899 	struct ocfs2_live_connection *lc = conn->cc_private;
900 	struct ocfs2_protocol_version pv;
901 
902 	running_proto.pv_major =
903 		ocfs2_user_plugin.sp_max_proto.pv_major;
904 	running_proto.pv_minor =
905 		ocfs2_user_plugin.sp_max_proto.pv_minor;
906 
907 	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
908 	ret = version_lock(conn, DLM_LOCK_EX,
909 			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
910 	if (!ret) {
911 		conn->cc_version.pv_major = running_proto.pv_major;
912 		conn->cc_version.pv_minor = running_proto.pv_minor;
913 		version_to_lvb(&running_proto, lc->oc_lvb);
914 		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
915 	} else if (ret == -EAGAIN) {
916 		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
917 		if (ret)
918 			goto out;
919 		lvb_to_version(lc->oc_lvb, &pv);
920 
921 		if ((pv.pv_major != running_proto.pv_major) ||
922 				(pv.pv_minor > running_proto.pv_minor)) {
923 			ret = -EINVAL;
924 			goto out;
925 		}
926 
927 		conn->cc_version.pv_major = pv.pv_major;
928 		conn->cc_version.pv_minor = pv.pv_minor;
929 	}
930 out:
931 	return ret;
932 }
933 
934 static void user_recover_prep(void *arg)
935 {
936 }
937 
938 static void user_recover_slot(void *arg, struct dlm_slot *slot)
939 {
940 	struct ocfs2_cluster_connection *conn = arg;
941 	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
942 			slot->nodeid, slot->slot);
943 	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
944 
945 }
946 
947 static void user_recover_done(void *arg, struct dlm_slot *slots,
948 		int num_slots, int our_slot,
949 		uint32_t generation)
950 {
951 	struct ocfs2_cluster_connection *conn = arg;
952 	struct ocfs2_live_connection *lc = conn->cc_private;
953 	int i;
954 
955 	for (i = 0; i < num_slots; i++)
956 		if (slots[i].slot == our_slot) {
957 			atomic_set(&lc->oc_this_node, slots[i].nodeid);
958 			break;
959 		}
960 
961 	lc->oc_our_slot = our_slot;
962 	wake_up(&lc->oc_wait);
963 }
964 
965 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
966 	.recover_prep = user_recover_prep,
967 	.recover_slot = user_recover_slot,
968 	.recover_done = user_recover_done,
969 };
970 
971 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
972 {
973 	version_unlock(conn);
974 	dlm_release_lockspace(conn->cc_lockspace, 2);
975 	conn->cc_lockspace = NULL;
976 	ocfs2_live_connection_drop(conn->cc_private);
977 	conn->cc_private = NULL;
978 	return 0;
979 }
980 
981 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
982 {
983 	dlm_lockspace_t *fsdlm;
984 	struct ocfs2_live_connection *lc;
985 	int rc, ops_rv;
986 
987 	BUG_ON(conn == NULL);
988 
989 	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
990 	if (!lc)
991 		return -ENOMEM;
992 
993 	init_waitqueue_head(&lc->oc_wait);
994 	init_completion(&lc->oc_sync_wait);
995 	atomic_set(&lc->oc_this_node, 0);
996 	conn->cc_private = lc;
997 	lc->oc_type = NO_CONTROLD;
998 
999 	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
1000 			       DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
1001 			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
1002 	if (rc) {
1003 		if (rc == -EEXIST || rc == -EPROTO)
1004 			printk(KERN_ERR "ocfs2: Unable to create the "
1005 				"lockspace %s (%d), because a ocfs2-tools "
1006 				"program is running on this file system "
1007 				"with the same name lockspace\n",
1008 				conn->cc_name, rc);
1009 		goto out;
1010 	}
1011 
1012 	if (ops_rv == -EOPNOTSUPP) {
1013 		lc->oc_type = WITH_CONTROLD;
1014 		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1015 				"version of dlm_controld and/or ocfs2-tools."
1016 				" Please consider upgrading.\n");
1017 	} else if (ops_rv) {
1018 		rc = ops_rv;
1019 		goto out;
1020 	}
1021 	conn->cc_lockspace = fsdlm;
1022 
1023 	rc = ocfs2_live_connection_attach(conn, lc);
1024 	if (rc)
1025 		goto out;
1026 
1027 	if (lc->oc_type == NO_CONTROLD) {
1028 		rc = get_protocol_version(conn);
1029 		if (rc) {
1030 			printk(KERN_ERR "ocfs2: Could not determine"
1031 					" locking version\n");
1032 			user_cluster_disconnect(conn);
1033 			goto out;
1034 		}
1035 		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1036 	}
1037 
1038 	/*
1039 	 * running_proto must have been set before we allowed any mounts
1040 	 * to proceed.
1041 	 */
1042 	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1043 		printk(KERN_ERR
1044 		       "Unable to mount with fs locking protocol version "
1045 		       "%u.%u because negotiated protocol is %u.%u\n",
1046 		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
1047 		       running_proto.pv_major, running_proto.pv_minor);
1048 		rc = -EPROTO;
1049 		ocfs2_live_connection_drop(lc);
1050 		lc = NULL;
1051 	}
1052 
1053 out:
1054 	if (rc)
1055 		kfree(lc);
1056 	return rc;
1057 }
1058 
1059 
1060 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1061 				  unsigned int *this_node)
1062 {
1063 	int rc;
1064 	struct ocfs2_live_connection *lc = conn->cc_private;
1065 
1066 	if (lc->oc_type == WITH_CONTROLD)
1067 		rc = ocfs2_control_get_this_node();
1068 	else if (lc->oc_type == NO_CONTROLD)
1069 		rc = atomic_read(&lc->oc_this_node);
1070 	else
1071 		rc = -EINVAL;
1072 
1073 	if (rc < 0)
1074 		return rc;
1075 
1076 	*this_node = rc;
1077 	return 0;
1078 }
1079 
1080 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1081 	.connect	= user_cluster_connect,
1082 	.disconnect	= user_cluster_disconnect,
1083 	.this_node	= user_cluster_this_node,
1084 	.dlm_lock	= user_dlm_lock,
1085 	.dlm_unlock	= user_dlm_unlock,
1086 	.lock_status	= user_dlm_lock_status,
1087 	.lvb_valid	= user_dlm_lvb_valid,
1088 	.lock_lvb	= user_dlm_lvb,
1089 	.plock		= user_plock,
1090 	.dump_lksb	= user_dlm_dump_lksb,
1091 };
1092 
1093 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1094 	.sp_name	= "user",
1095 	.sp_ops		= &ocfs2_user_plugin_ops,
1096 	.sp_owner	= THIS_MODULE,
1097 };
1098 
1099 
1100 static int __init ocfs2_user_plugin_init(void)
1101 {
1102 	int rc;
1103 
1104 	rc = ocfs2_control_init();
1105 	if (!rc) {
1106 		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1107 		if (rc)
1108 			ocfs2_control_exit();
1109 	}
1110 
1111 	return rc;
1112 }
1113 
1114 static void __exit ocfs2_user_plugin_exit(void)
1115 {
1116 	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1117 	ocfs2_control_exit();
1118 }
1119 
1120 MODULE_AUTHOR("Oracle");
1121 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1122 MODULE_LICENSE("GPL");
1123 module_init(ocfs2_user_plugin_init);
1124 module_exit(ocfs2_user_plugin_exit);
1125