xref: /linux/fs/ocfs2/stack_user.c (revision ebf68996de0ab250c5d520eb2291ab65643e9a1e)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /* -*- mode: c; c-basic-offset: 8; -*-
3  * vim: noexpandtab sw=8 ts=8 sts=0:
4  *
5  * stack_user.c
6  *
7  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
8  *
9  * Copyright (C) 2007 Oracle.  All rights reserved.
10  */
11 
12 #include <linux/module.h>
13 #include <linux/fs.h>
14 #include <linux/miscdevice.h>
15 #include <linux/mutex.h>
16 #include <linux/slab.h>
17 #include <linux/reboot.h>
18 #include <linux/sched.h>
19 #include <linux/uaccess.h>
20 
21 #include "stackglue.h"
22 
23 #include <linux/dlm_plock.h>
24 
25 /*
26  * The control protocol starts with a handshake.  Until the handshake
27  * is complete, the control device will fail all write(2)s.
28  *
29  * The handshake is simple.  First, the client reads until EOF.  Each line
30  * of output is a supported protocol tag.  All protocol tags are a single
31  * character followed by a two hex digit version number.  Currently the
32  * only things supported is T01, for "Text-base version 0x01".  Next, the
33  * client writes the version they would like to use, including the newline.
34  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
35  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
36  * client can start sending messages.
37  *
38  * The T01 protocol has three messages.  First is the "SETN" message.
39  * It has the following syntax:
40  *
41  *  SETN<space><8-char-hex-nodenum><newline>
42  *
43  * This is 14 characters.
44  *
45  * The "SETN" message must be the first message following the protocol.
46  * It tells ocfs2_control the local node number.
47  *
48  * Next comes the "SETV" message.  It has the following syntax:
49  *
50  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
51  *
52  * This is 11 characters.
53  *
54  * The "SETV" message sets the filesystem locking protocol version as
55  * negotiated by the client.  The client negotiates based on the maximum
56  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
57  * number from the "SETV" message must match
58  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
59  * must be less than or equal to ...sp_max_version.pv_minor.
60  *
61  * Once this information has been set, mounts will be allowed.  From this
62  * point on, the "DOWN" message can be sent for node down notification.
63  * It has the following syntax:
64  *
65  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
66  *
67  * eg:
68  *
69  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
70  *
71  * This is 47 characters.
72  */
73 
74 /*
75  * Whether or not the client has done the handshake.
76  * For now, we have just one protocol version.
77  */
78 #define OCFS2_CONTROL_PROTO			"T01\n"
79 #define OCFS2_CONTROL_PROTO_LEN			4
80 
81 /* Handshake states */
82 #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
83 #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
84 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
85 #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
86 
87 /* Messages */
88 #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
89 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
90 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
92 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
93 #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
94 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
95 #define OCFS2_TEXT_UUID_LEN			32
96 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
97 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
98 #define VERSION_LOCK				"version_lock"
99 
100 enum ocfs2_connection_type {
101 	WITH_CONTROLD,
102 	NO_CONTROLD
103 };
104 
105 /*
106  * ocfs2_live_connection is refcounted because the filesystem and
107  * miscdevice sides can detach in different order.  Let's just be safe.
108  */
109 struct ocfs2_live_connection {
110 	struct list_head		oc_list;
111 	struct ocfs2_cluster_connection	*oc_conn;
112 	enum ocfs2_connection_type	oc_type;
113 	atomic_t                        oc_this_node;
114 	int                             oc_our_slot;
115 	struct dlm_lksb                 oc_version_lksb;
116 	char                            oc_lvb[DLM_LVB_LEN];
117 	struct completion               oc_sync_wait;
118 	wait_queue_head_t		oc_wait;
119 };
120 
121 struct ocfs2_control_private {
122 	struct list_head op_list;
123 	int op_state;
124 	int op_this_node;
125 	struct ocfs2_protocol_version op_proto;
126 };
127 
128 /* SETN<space><8-char-hex-nodenum><newline> */
129 struct ocfs2_control_message_setn {
130 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
131 	char	space;
132 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
133 	char	newline;
134 };
135 
136 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
137 struct ocfs2_control_message_setv {
138 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
139 	char	space1;
140 	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
141 	char	space2;
142 	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
143 	char	newline;
144 };
145 
146 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
147 struct ocfs2_control_message_down {
148 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
149 	char	space1;
150 	char	uuid[OCFS2_TEXT_UUID_LEN];
151 	char	space2;
152 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
153 	char	newline;
154 };
155 
156 union ocfs2_control_message {
157 	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
158 	struct ocfs2_control_message_setn	u_setn;
159 	struct ocfs2_control_message_setv	u_setv;
160 	struct ocfs2_control_message_down	u_down;
161 };
162 
163 static struct ocfs2_stack_plugin ocfs2_user_plugin;
164 
165 static atomic_t ocfs2_control_opened;
166 static int ocfs2_control_this_node = -1;
167 static struct ocfs2_protocol_version running_proto;
168 
169 static LIST_HEAD(ocfs2_live_connection_list);
170 static LIST_HEAD(ocfs2_control_private_list);
171 static DEFINE_MUTEX(ocfs2_control_lock);
172 
173 static inline void ocfs2_control_set_handshake_state(struct file *file,
174 						     int state)
175 {
176 	struct ocfs2_control_private *p = file->private_data;
177 	p->op_state = state;
178 }
179 
180 static inline int ocfs2_control_get_handshake_state(struct file *file)
181 {
182 	struct ocfs2_control_private *p = file->private_data;
183 	return p->op_state;
184 }
185 
186 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
187 {
188 	size_t len = strlen(name);
189 	struct ocfs2_live_connection *c;
190 
191 	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
192 
193 	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
194 		if ((c->oc_conn->cc_namelen == len) &&
195 		    !strncmp(c->oc_conn->cc_name, name, len))
196 			return c;
197 	}
198 
199 	return NULL;
200 }
201 
202 /*
203  * ocfs2_live_connection structures are created underneath the ocfs2
204  * mount path.  Since the VFS prevents multiple calls to
205  * fill_super(), we can't get dupes here.
206  */
207 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
208 				     struct ocfs2_live_connection *c)
209 {
210 	int rc = 0;
211 
212 	mutex_lock(&ocfs2_control_lock);
213 	c->oc_conn = conn;
214 
215 	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
216 		list_add(&c->oc_list, &ocfs2_live_connection_list);
217 	else {
218 		printk(KERN_ERR
219 		       "ocfs2: Userspace control daemon is not present\n");
220 		rc = -ESRCH;
221 	}
222 
223 	mutex_unlock(&ocfs2_control_lock);
224 	return rc;
225 }
226 
227 /*
228  * This function disconnects the cluster connection from ocfs2_control.
229  * Afterwards, userspace can't affect the cluster connection.
230  */
231 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
232 {
233 	mutex_lock(&ocfs2_control_lock);
234 	list_del_init(&c->oc_list);
235 	c->oc_conn = NULL;
236 	mutex_unlock(&ocfs2_control_lock);
237 
238 	kfree(c);
239 }
240 
241 static int ocfs2_control_cfu(void *target, size_t target_len,
242 			     const char __user *buf, size_t count)
243 {
244 	/* The T01 expects write(2) calls to have exactly one command */
245 	if ((count != target_len) ||
246 	    (count > sizeof(union ocfs2_control_message)))
247 		return -EINVAL;
248 
249 	if (copy_from_user(target, buf, target_len))
250 		return -EFAULT;
251 
252 	return 0;
253 }
254 
255 static ssize_t ocfs2_control_validate_protocol(struct file *file,
256 					       const char __user *buf,
257 					       size_t count)
258 {
259 	ssize_t ret;
260 	char kbuf[OCFS2_CONTROL_PROTO_LEN];
261 
262 	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
263 				buf, count);
264 	if (ret)
265 		return ret;
266 
267 	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
268 		return -EINVAL;
269 
270 	ocfs2_control_set_handshake_state(file,
271 					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
272 
273 	return count;
274 }
275 
276 static void ocfs2_control_send_down(const char *uuid,
277 				    int nodenum)
278 {
279 	struct ocfs2_live_connection *c;
280 
281 	mutex_lock(&ocfs2_control_lock);
282 
283 	c = ocfs2_connection_find(uuid);
284 	if (c) {
285 		BUG_ON(c->oc_conn == NULL);
286 		c->oc_conn->cc_recovery_handler(nodenum,
287 						c->oc_conn->cc_recovery_data);
288 	}
289 
290 	mutex_unlock(&ocfs2_control_lock);
291 }
292 
293 /*
294  * Called whenever configuration elements are sent to /dev/ocfs2_control.
295  * If all configuration elements are present, try to set the global
296  * values.  If there is a problem, return an error.  Skip any missing
297  * elements, and only bump ocfs2_control_opened when we have all elements
298  * and are successful.
299  */
300 static int ocfs2_control_install_private(struct file *file)
301 {
302 	int rc = 0;
303 	int set_p = 1;
304 	struct ocfs2_control_private *p = file->private_data;
305 
306 	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
307 
308 	mutex_lock(&ocfs2_control_lock);
309 
310 	if (p->op_this_node < 0) {
311 		set_p = 0;
312 	} else if ((ocfs2_control_this_node >= 0) &&
313 		   (ocfs2_control_this_node != p->op_this_node)) {
314 		rc = -EINVAL;
315 		goto out_unlock;
316 	}
317 
318 	if (!p->op_proto.pv_major) {
319 		set_p = 0;
320 	} else if (!list_empty(&ocfs2_live_connection_list) &&
321 		   ((running_proto.pv_major != p->op_proto.pv_major) ||
322 		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
323 		rc = -EINVAL;
324 		goto out_unlock;
325 	}
326 
327 	if (set_p) {
328 		ocfs2_control_this_node = p->op_this_node;
329 		running_proto.pv_major = p->op_proto.pv_major;
330 		running_proto.pv_minor = p->op_proto.pv_minor;
331 	}
332 
333 out_unlock:
334 	mutex_unlock(&ocfs2_control_lock);
335 
336 	if (!rc && set_p) {
337 		/* We set the global values successfully */
338 		atomic_inc(&ocfs2_control_opened);
339 		ocfs2_control_set_handshake_state(file,
340 					OCFS2_CONTROL_HANDSHAKE_VALID);
341 	}
342 
343 	return rc;
344 }
345 
346 static int ocfs2_control_get_this_node(void)
347 {
348 	int rc;
349 
350 	mutex_lock(&ocfs2_control_lock);
351 	if (ocfs2_control_this_node < 0)
352 		rc = -EINVAL;
353 	else
354 		rc = ocfs2_control_this_node;
355 	mutex_unlock(&ocfs2_control_lock);
356 
357 	return rc;
358 }
359 
360 static int ocfs2_control_do_setnode_msg(struct file *file,
361 					struct ocfs2_control_message_setn *msg)
362 {
363 	long nodenum;
364 	char *ptr = NULL;
365 	struct ocfs2_control_private *p = file->private_data;
366 
367 	if (ocfs2_control_get_handshake_state(file) !=
368 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
369 		return -EINVAL;
370 
371 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
372 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
373 		return -EINVAL;
374 
375 	if ((msg->space != ' ') || (msg->newline != '\n'))
376 		return -EINVAL;
377 	msg->space = msg->newline = '\0';
378 
379 	nodenum = simple_strtol(msg->nodestr, &ptr, 16);
380 	if (!ptr || *ptr)
381 		return -EINVAL;
382 
383 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
384 	    (nodenum > INT_MAX) || (nodenum < 0))
385 		return -ERANGE;
386 	p->op_this_node = nodenum;
387 
388 	return ocfs2_control_install_private(file);
389 }
390 
391 static int ocfs2_control_do_setversion_msg(struct file *file,
392 					   struct ocfs2_control_message_setv *msg)
393 {
394 	long major, minor;
395 	char *ptr = NULL;
396 	struct ocfs2_control_private *p = file->private_data;
397 	struct ocfs2_protocol_version *max =
398 		&ocfs2_user_plugin.sp_max_proto;
399 
400 	if (ocfs2_control_get_handshake_state(file) !=
401 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
402 		return -EINVAL;
403 
404 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
405 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
406 		return -EINVAL;
407 
408 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
409 	    (msg->newline != '\n'))
410 		return -EINVAL;
411 	msg->space1 = msg->space2 = msg->newline = '\0';
412 
413 	major = simple_strtol(msg->major, &ptr, 16);
414 	if (!ptr || *ptr)
415 		return -EINVAL;
416 	minor = simple_strtol(msg->minor, &ptr, 16);
417 	if (!ptr || *ptr)
418 		return -EINVAL;
419 
420 	/*
421 	 * The major must be between 1 and 255, inclusive.  The minor
422 	 * must be between 0 and 255, inclusive.  The version passed in
423 	 * must be within the maximum version supported by the filesystem.
424 	 */
425 	if ((major == LONG_MIN) || (major == LONG_MAX) ||
426 	    (major > (u8)-1) || (major < 1))
427 		return -ERANGE;
428 	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
429 	    (minor > (u8)-1) || (minor < 0))
430 		return -ERANGE;
431 	if ((major != max->pv_major) ||
432 	    (minor > max->pv_minor))
433 		return -EINVAL;
434 
435 	p->op_proto.pv_major = major;
436 	p->op_proto.pv_minor = minor;
437 
438 	return ocfs2_control_install_private(file);
439 }
440 
441 static int ocfs2_control_do_down_msg(struct file *file,
442 				     struct ocfs2_control_message_down *msg)
443 {
444 	long nodenum;
445 	char *p = NULL;
446 
447 	if (ocfs2_control_get_handshake_state(file) !=
448 	    OCFS2_CONTROL_HANDSHAKE_VALID)
449 		return -EINVAL;
450 
451 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
452 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
453 		return -EINVAL;
454 
455 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
456 	    (msg->newline != '\n'))
457 		return -EINVAL;
458 	msg->space1 = msg->space2 = msg->newline = '\0';
459 
460 	nodenum = simple_strtol(msg->nodestr, &p, 16);
461 	if (!p || *p)
462 		return -EINVAL;
463 
464 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
465 	    (nodenum > INT_MAX) || (nodenum < 0))
466 		return -ERANGE;
467 
468 	ocfs2_control_send_down(msg->uuid, nodenum);
469 
470 	return 0;
471 }
472 
473 static ssize_t ocfs2_control_message(struct file *file,
474 				     const char __user *buf,
475 				     size_t count)
476 {
477 	ssize_t ret;
478 	union ocfs2_control_message msg;
479 
480 	/* Try to catch padding issues */
481 	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
482 		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
483 
484 	memset(&msg, 0, sizeof(union ocfs2_control_message));
485 	ret = ocfs2_control_cfu(&msg, count, buf, count);
486 	if (ret)
487 		goto out;
488 
489 	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
490 	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
491 		     OCFS2_CONTROL_MESSAGE_OP_LEN))
492 		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
493 	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
494 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
495 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
496 		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
497 	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
498 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
499 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
500 		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
501 	else
502 		ret = -EINVAL;
503 
504 out:
505 	return ret ? ret : count;
506 }
507 
508 static ssize_t ocfs2_control_write(struct file *file,
509 				   const char __user *buf,
510 				   size_t count,
511 				   loff_t *ppos)
512 {
513 	ssize_t ret;
514 
515 	switch (ocfs2_control_get_handshake_state(file)) {
516 		case OCFS2_CONTROL_HANDSHAKE_INVALID:
517 			ret = -EINVAL;
518 			break;
519 
520 		case OCFS2_CONTROL_HANDSHAKE_READ:
521 			ret = ocfs2_control_validate_protocol(file, buf,
522 							      count);
523 			break;
524 
525 		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
526 		case OCFS2_CONTROL_HANDSHAKE_VALID:
527 			ret = ocfs2_control_message(file, buf, count);
528 			break;
529 
530 		default:
531 			BUG();
532 			ret = -EIO;
533 			break;
534 	}
535 
536 	return ret;
537 }
538 
539 /*
540  * This is a naive version.  If we ever have a new protocol, we'll expand
541  * it.  Probably using seq_file.
542  */
543 static ssize_t ocfs2_control_read(struct file *file,
544 				  char __user *buf,
545 				  size_t count,
546 				  loff_t *ppos)
547 {
548 	ssize_t ret;
549 
550 	ret = simple_read_from_buffer(buf, count, ppos,
551 			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
552 
553 	/* Have we read the whole protocol list? */
554 	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
555 		ocfs2_control_set_handshake_state(file,
556 						  OCFS2_CONTROL_HANDSHAKE_READ);
557 
558 	return ret;
559 }
560 
561 static int ocfs2_control_release(struct inode *inode, struct file *file)
562 {
563 	struct ocfs2_control_private *p = file->private_data;
564 
565 	mutex_lock(&ocfs2_control_lock);
566 
567 	if (ocfs2_control_get_handshake_state(file) !=
568 	    OCFS2_CONTROL_HANDSHAKE_VALID)
569 		goto out;
570 
571 	if (atomic_dec_and_test(&ocfs2_control_opened)) {
572 		if (!list_empty(&ocfs2_live_connection_list)) {
573 			/* XXX: Do bad things! */
574 			printk(KERN_ERR
575 			       "ocfs2: Unexpected release of ocfs2_control!\n"
576 			       "       Loss of cluster connection requires "
577 			       "an emergency restart!\n");
578 			emergency_restart();
579 		}
580 		/*
581 		 * Last valid close clears the node number and resets
582 		 * the locking protocol version
583 		 */
584 		ocfs2_control_this_node = -1;
585 		running_proto.pv_major = 0;
586 		running_proto.pv_minor = 0;
587 	}
588 
589 out:
590 	list_del_init(&p->op_list);
591 	file->private_data = NULL;
592 
593 	mutex_unlock(&ocfs2_control_lock);
594 
595 	kfree(p);
596 
597 	return 0;
598 }
599 
600 static int ocfs2_control_open(struct inode *inode, struct file *file)
601 {
602 	struct ocfs2_control_private *p;
603 
604 	p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL);
605 	if (!p)
606 		return -ENOMEM;
607 	p->op_this_node = -1;
608 
609 	mutex_lock(&ocfs2_control_lock);
610 	file->private_data = p;
611 	list_add(&p->op_list, &ocfs2_control_private_list);
612 	mutex_unlock(&ocfs2_control_lock);
613 
614 	return 0;
615 }
616 
617 static const struct file_operations ocfs2_control_fops = {
618 	.open    = ocfs2_control_open,
619 	.release = ocfs2_control_release,
620 	.read    = ocfs2_control_read,
621 	.write   = ocfs2_control_write,
622 	.owner   = THIS_MODULE,
623 	.llseek  = default_llseek,
624 };
625 
626 static struct miscdevice ocfs2_control_device = {
627 	.minor		= MISC_DYNAMIC_MINOR,
628 	.name		= "ocfs2_control",
629 	.fops		= &ocfs2_control_fops,
630 };
631 
632 static int ocfs2_control_init(void)
633 {
634 	int rc;
635 
636 	atomic_set(&ocfs2_control_opened, 0);
637 
638 	rc = misc_register(&ocfs2_control_device);
639 	if (rc)
640 		printk(KERN_ERR
641 		       "ocfs2: Unable to register ocfs2_control device "
642 		       "(errno %d)\n",
643 		       -rc);
644 
645 	return rc;
646 }
647 
648 static void ocfs2_control_exit(void)
649 {
650 	misc_deregister(&ocfs2_control_device);
651 }
652 
653 static void fsdlm_lock_ast_wrapper(void *astarg)
654 {
655 	struct ocfs2_dlm_lksb *lksb = astarg;
656 	int status = lksb->lksb_fsdlm.sb_status;
657 
658 	/*
659 	 * For now we're punting on the issue of other non-standard errors
660 	 * where we can't tell if the unlock_ast or lock_ast should be called.
661 	 * The main "other error" that's possible is EINVAL which means the
662 	 * function was called with invalid args, which shouldn't be possible
663 	 * since the caller here is under our control.  Other non-standard
664 	 * errors probably fall into the same category, or otherwise are fatal
665 	 * which means we can't carry on anyway.
666 	 */
667 
668 	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
669 		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
670 	else
671 		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
672 }
673 
674 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
675 {
676 	struct ocfs2_dlm_lksb *lksb = astarg;
677 
678 	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
679 }
680 
681 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
682 			 int mode,
683 			 struct ocfs2_dlm_lksb *lksb,
684 			 u32 flags,
685 			 void *name,
686 			 unsigned int namelen)
687 {
688 	int ret;
689 
690 	if (!lksb->lksb_fsdlm.sb_lvbptr)
691 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
692 					     sizeof(struct dlm_lksb);
693 
694 	ret = dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
695 		       flags|DLM_LKF_NODLCKWT, name, namelen, 0,
696 		       fsdlm_lock_ast_wrapper, lksb,
697 		       fsdlm_blocking_ast_wrapper);
698 	return ret;
699 }
700 
701 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
702 			   struct ocfs2_dlm_lksb *lksb,
703 			   u32 flags)
704 {
705 	int ret;
706 
707 	ret = dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
708 			 flags, &lksb->lksb_fsdlm, lksb);
709 	return ret;
710 }
711 
712 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
713 {
714 	return lksb->lksb_fsdlm.sb_status;
715 }
716 
717 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
718 {
719 	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
720 
721 	return !invalid;
722 }
723 
724 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
725 {
726 	if (!lksb->lksb_fsdlm.sb_lvbptr)
727 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
728 					     sizeof(struct dlm_lksb);
729 	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
730 }
731 
732 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
733 {
734 }
735 
736 static int user_plock(struct ocfs2_cluster_connection *conn,
737 		      u64 ino,
738 		      struct file *file,
739 		      int cmd,
740 		      struct file_lock *fl)
741 {
742 	/*
743 	 * This more or less just demuxes the plock request into any
744 	 * one of three dlm calls.
745 	 *
746 	 * Internally, fs/dlm will pass these to a misc device, which
747 	 * a userspace daemon will read and write to.
748 	 *
749 	 * For now, cancel requests (which happen internally only),
750 	 * are turned into unlocks. Most of this function taken from
751 	 * gfs2_lock.
752 	 */
753 
754 	if (cmd == F_CANCELLK) {
755 		cmd = F_SETLK;
756 		fl->fl_type = F_UNLCK;
757 	}
758 
759 	if (IS_GETLK(cmd))
760 		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
761 	else if (fl->fl_type == F_UNLCK)
762 		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
763 	else
764 		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
765 }
766 
767 /*
768  * Compare a requested locking protocol version against the current one.
769  *
770  * If the major numbers are different, they are incompatible.
771  * If the current minor is greater than the request, they are incompatible.
772  * If the current minor is less than or equal to the request, they are
773  * compatible, and the requester should run at the current minor version.
774  */
775 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
776 			       struct ocfs2_protocol_version *request)
777 {
778 	if (existing->pv_major != request->pv_major)
779 		return 1;
780 
781 	if (existing->pv_minor > request->pv_minor)
782 		return 1;
783 
784 	if (existing->pv_minor < request->pv_minor)
785 		request->pv_minor = existing->pv_minor;
786 
787 	return 0;
788 }
789 
790 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
791 {
792 	struct ocfs2_protocol_version *pv =
793 		(struct ocfs2_protocol_version *)lvb;
794 	/*
795 	 * ocfs2_protocol_version has two u8 variables, so we don't
796 	 * need any endian conversion.
797 	 */
798 	ver->pv_major = pv->pv_major;
799 	ver->pv_minor = pv->pv_minor;
800 }
801 
802 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
803 {
804 	struct ocfs2_protocol_version *pv =
805 		(struct ocfs2_protocol_version *)lvb;
806 	/*
807 	 * ocfs2_protocol_version has two u8 variables, so we don't
808 	 * need any endian conversion.
809 	 */
810 	pv->pv_major = ver->pv_major;
811 	pv->pv_minor = ver->pv_minor;
812 }
813 
814 static void sync_wait_cb(void *arg)
815 {
816 	struct ocfs2_cluster_connection *conn = arg;
817 	struct ocfs2_live_connection *lc = conn->cc_private;
818 	complete(&lc->oc_sync_wait);
819 }
820 
821 static int sync_unlock(struct ocfs2_cluster_connection *conn,
822 		struct dlm_lksb *lksb, char *name)
823 {
824 	int error;
825 	struct ocfs2_live_connection *lc = conn->cc_private;
826 
827 	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
828 	if (error) {
829 		printk(KERN_ERR "%s lkid %x error %d\n",
830 				name, lksb->sb_lkid, error);
831 		return error;
832 	}
833 
834 	wait_for_completion(&lc->oc_sync_wait);
835 
836 	if (lksb->sb_status != -DLM_EUNLOCK) {
837 		printk(KERN_ERR "%s lkid %x status %d\n",
838 				name, lksb->sb_lkid, lksb->sb_status);
839 		return -1;
840 	}
841 	return 0;
842 }
843 
844 static int sync_lock(struct ocfs2_cluster_connection *conn,
845 		int mode, uint32_t flags,
846 		struct dlm_lksb *lksb, char *name)
847 {
848 	int error, status;
849 	struct ocfs2_live_connection *lc = conn->cc_private;
850 
851 	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
852 			name, strlen(name),
853 			0, sync_wait_cb, conn, NULL);
854 	if (error) {
855 		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
856 				name, lksb->sb_lkid, flags, mode, error);
857 		return error;
858 	}
859 
860 	wait_for_completion(&lc->oc_sync_wait);
861 
862 	status = lksb->sb_status;
863 
864 	if (status && status != -EAGAIN) {
865 		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
866 				name, lksb->sb_lkid, flags, mode, status);
867 	}
868 
869 	return status;
870 }
871 
872 
873 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
874 		int flags)
875 {
876 	struct ocfs2_live_connection *lc = conn->cc_private;
877 	return sync_lock(conn, mode, flags,
878 			&lc->oc_version_lksb, VERSION_LOCK);
879 }
880 
881 static int version_unlock(struct ocfs2_cluster_connection *conn)
882 {
883 	struct ocfs2_live_connection *lc = conn->cc_private;
884 	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
885 }
886 
887 /* get_protocol_version()
888  *
889  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
890  * The algorithm is:
891  * 1. Attempt to take the lock in EX mode (non-blocking).
892  * 2. If successful (which means it is the first mount), write the
893  *    version number and downconvert to PR lock.
894  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
895  *    taking the PR lock.
896  */
897 
898 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
899 {
900 	int ret;
901 	struct ocfs2_live_connection *lc = conn->cc_private;
902 	struct ocfs2_protocol_version pv;
903 
904 	running_proto.pv_major =
905 		ocfs2_user_plugin.sp_max_proto.pv_major;
906 	running_proto.pv_minor =
907 		ocfs2_user_plugin.sp_max_proto.pv_minor;
908 
909 	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
910 	ret = version_lock(conn, DLM_LOCK_EX,
911 			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
912 	if (!ret) {
913 		conn->cc_version.pv_major = running_proto.pv_major;
914 		conn->cc_version.pv_minor = running_proto.pv_minor;
915 		version_to_lvb(&running_proto, lc->oc_lvb);
916 		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
917 	} else if (ret == -EAGAIN) {
918 		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
919 		if (ret)
920 			goto out;
921 		lvb_to_version(lc->oc_lvb, &pv);
922 
923 		if ((pv.pv_major != running_proto.pv_major) ||
924 				(pv.pv_minor > running_proto.pv_minor)) {
925 			ret = -EINVAL;
926 			goto out;
927 		}
928 
929 		conn->cc_version.pv_major = pv.pv_major;
930 		conn->cc_version.pv_minor = pv.pv_minor;
931 	}
932 out:
933 	return ret;
934 }
935 
936 static void user_recover_prep(void *arg)
937 {
938 }
939 
940 static void user_recover_slot(void *arg, struct dlm_slot *slot)
941 {
942 	struct ocfs2_cluster_connection *conn = arg;
943 	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
944 			slot->nodeid, slot->slot);
945 	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
946 
947 }
948 
949 static void user_recover_done(void *arg, struct dlm_slot *slots,
950 		int num_slots, int our_slot,
951 		uint32_t generation)
952 {
953 	struct ocfs2_cluster_connection *conn = arg;
954 	struct ocfs2_live_connection *lc = conn->cc_private;
955 	int i;
956 
957 	for (i = 0; i < num_slots; i++)
958 		if (slots[i].slot == our_slot) {
959 			atomic_set(&lc->oc_this_node, slots[i].nodeid);
960 			break;
961 		}
962 
963 	lc->oc_our_slot = our_slot;
964 	wake_up(&lc->oc_wait);
965 }
966 
967 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
968 	.recover_prep = user_recover_prep,
969 	.recover_slot = user_recover_slot,
970 	.recover_done = user_recover_done,
971 };
972 
973 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
974 {
975 	version_unlock(conn);
976 	dlm_release_lockspace(conn->cc_lockspace, 2);
977 	conn->cc_lockspace = NULL;
978 	ocfs2_live_connection_drop(conn->cc_private);
979 	conn->cc_private = NULL;
980 	return 0;
981 }
982 
983 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
984 {
985 	dlm_lockspace_t *fsdlm;
986 	struct ocfs2_live_connection *lc;
987 	int rc, ops_rv;
988 
989 	BUG_ON(conn == NULL);
990 
991 	lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL);
992 	if (!lc)
993 		return -ENOMEM;
994 
995 	init_waitqueue_head(&lc->oc_wait);
996 	init_completion(&lc->oc_sync_wait);
997 	atomic_set(&lc->oc_this_node, 0);
998 	conn->cc_private = lc;
999 	lc->oc_type = NO_CONTROLD;
1000 
1001 	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
1002 			       DLM_LSFL_FS | DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
1003 			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
1004 	if (rc) {
1005 		if (rc == -EEXIST || rc == -EPROTO)
1006 			printk(KERN_ERR "ocfs2: Unable to create the "
1007 				"lockspace %s (%d), because a ocfs2-tools "
1008 				"program is running on this file system "
1009 				"with the same name lockspace\n",
1010 				conn->cc_name, rc);
1011 		goto out;
1012 	}
1013 
1014 	if (ops_rv == -EOPNOTSUPP) {
1015 		lc->oc_type = WITH_CONTROLD;
1016 		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
1017 				"version of dlm_controld and/or ocfs2-tools."
1018 				" Please consider upgrading.\n");
1019 	} else if (ops_rv) {
1020 		rc = ops_rv;
1021 		goto out;
1022 	}
1023 	conn->cc_lockspace = fsdlm;
1024 
1025 	rc = ocfs2_live_connection_attach(conn, lc);
1026 	if (rc)
1027 		goto out;
1028 
1029 	if (lc->oc_type == NO_CONTROLD) {
1030 		rc = get_protocol_version(conn);
1031 		if (rc) {
1032 			printk(KERN_ERR "ocfs2: Could not determine"
1033 					" locking version\n");
1034 			user_cluster_disconnect(conn);
1035 			goto out;
1036 		}
1037 		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1038 	}
1039 
1040 	/*
1041 	 * running_proto must have been set before we allowed any mounts
1042 	 * to proceed.
1043 	 */
1044 	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1045 		printk(KERN_ERR
1046 		       "Unable to mount with fs locking protocol version "
1047 		       "%u.%u because negotiated protocol is %u.%u\n",
1048 		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
1049 		       running_proto.pv_major, running_proto.pv_minor);
1050 		rc = -EPROTO;
1051 		ocfs2_live_connection_drop(lc);
1052 		lc = NULL;
1053 	}
1054 
1055 out:
1056 	if (rc)
1057 		kfree(lc);
1058 	return rc;
1059 }
1060 
1061 
1062 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1063 				  unsigned int *this_node)
1064 {
1065 	int rc;
1066 	struct ocfs2_live_connection *lc = conn->cc_private;
1067 
1068 	if (lc->oc_type == WITH_CONTROLD)
1069 		rc = ocfs2_control_get_this_node();
1070 	else if (lc->oc_type == NO_CONTROLD)
1071 		rc = atomic_read(&lc->oc_this_node);
1072 	else
1073 		rc = -EINVAL;
1074 
1075 	if (rc < 0)
1076 		return rc;
1077 
1078 	*this_node = rc;
1079 	return 0;
1080 }
1081 
1082 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1083 	.connect	= user_cluster_connect,
1084 	.disconnect	= user_cluster_disconnect,
1085 	.this_node	= user_cluster_this_node,
1086 	.dlm_lock	= user_dlm_lock,
1087 	.dlm_unlock	= user_dlm_unlock,
1088 	.lock_status	= user_dlm_lock_status,
1089 	.lvb_valid	= user_dlm_lvb_valid,
1090 	.lock_lvb	= user_dlm_lvb,
1091 	.plock		= user_plock,
1092 	.dump_lksb	= user_dlm_dump_lksb,
1093 };
1094 
1095 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1096 	.sp_name	= "user",
1097 	.sp_ops		= &ocfs2_user_plugin_ops,
1098 	.sp_owner	= THIS_MODULE,
1099 };
1100 
1101 
1102 static int __init ocfs2_user_plugin_init(void)
1103 {
1104 	int rc;
1105 
1106 	rc = ocfs2_control_init();
1107 	if (!rc) {
1108 		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1109 		if (rc)
1110 			ocfs2_control_exit();
1111 	}
1112 
1113 	return rc;
1114 }
1115 
1116 static void __exit ocfs2_user_plugin_exit(void)
1117 {
1118 	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1119 	ocfs2_control_exit();
1120 }
1121 
1122 MODULE_AUTHOR("Oracle");
1123 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1124 MODULE_LICENSE("GPL");
1125 module_init(ocfs2_user_plugin_init);
1126 module_exit(ocfs2_user_plugin_exit);
1127