xref: /linux/fs/ocfs2/stack_user.c (revision 57dcfd9049d497c31151787a0696d59f0a98f8e6)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * stack_user.c
4  *
5  * Code which interfaces ocfs2 with fs/dlm and a userspace stack.
6  *
7  * Copyright (C) 2007 Oracle.  All rights reserved.
8  */
9 
10 #include <linux/module.h>
11 #include <linux/fs.h>
12 #include <linux/filelock.h>
13 #include <linux/miscdevice.h>
14 #include <linux/mutex.h>
15 #include <linux/slab.h>
16 #include <linux/reboot.h>
17 #include <linux/sched.h>
18 #include <linux/uaccess.h>
19 
20 #include "stackglue.h"
21 
22 #include <linux/dlm_plock.h>
23 
24 /*
25  * The control protocol starts with a handshake.  Until the handshake
26  * is complete, the control device will fail all write(2)s.
27  *
28  * The handshake is simple.  First, the client reads until EOF.  Each line
29  * of output is a supported protocol tag.  All protocol tags are a single
30  * character followed by a two hex digit version number.  Currently the
31  * only things supported is T01, for "Text-base version 0x01".  Next, the
32  * client writes the version they would like to use, including the newline.
33  * Thus, the protocol tag is 'T01\n'.  If the version tag written is
34  * unknown, -EINVAL is returned.  Once the negotiation is complete, the
35  * client can start sending messages.
36  *
37  * The T01 protocol has three messages.  First is the "SETN" message.
38  * It has the following syntax:
39  *
40  *  SETN<space><8-char-hex-nodenum><newline>
41  *
42  * This is 14 characters.
43  *
44  * The "SETN" message must be the first message following the protocol.
45  * It tells ocfs2_control the local node number.
46  *
47  * Next comes the "SETV" message.  It has the following syntax:
48  *
49  *  SETV<space><2-char-hex-major><space><2-char-hex-minor><newline>
50  *
51  * This is 11 characters.
52  *
53  * The "SETV" message sets the filesystem locking protocol version as
54  * negotiated by the client.  The client negotiates based on the maximum
55  * version advertised in /sys/fs/ocfs2/max_locking_protocol.  The major
56  * number from the "SETV" message must match
57  * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number
58  * must be less than or equal to ...sp_max_version.pv_minor.
59  *
60  * Once this information has been set, mounts will be allowed.  From this
61  * point on, the "DOWN" message can be sent for node down notification.
62  * It has the following syntax:
63  *
64  *  DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline>
65  *
66  * eg:
67  *
68  *  DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n
69  *
70  * This is 47 characters.
71  */
72 
73 /*
74  * Whether or not the client has done the handshake.
75  * For now, we have just one protocol version.
76  */
77 #define OCFS2_CONTROL_PROTO			"T01\n"
78 #define OCFS2_CONTROL_PROTO_LEN			4
79 
80 /* Handshake states */
81 #define OCFS2_CONTROL_HANDSHAKE_INVALID		(0)
82 #define OCFS2_CONTROL_HANDSHAKE_READ		(1)
83 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL	(2)
84 #define OCFS2_CONTROL_HANDSHAKE_VALID		(3)
85 
86 /* Messages */
87 #define OCFS2_CONTROL_MESSAGE_OP_LEN		4
88 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP	"SETN"
89 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN	14
90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP	"SETV"
91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN	11
92 #define OCFS2_CONTROL_MESSAGE_DOWN_OP		"DOWN"
93 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN	47
94 #define OCFS2_TEXT_UUID_LEN			32
95 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN	2
96 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN	8
97 #define VERSION_LOCK				"version_lock"
98 
99 enum ocfs2_connection_type {
100 	WITH_CONTROLD,
101 	NO_CONTROLD
102 };
103 
104 /*
105  * ocfs2_live_connection is refcounted because the filesystem and
106  * miscdevice sides can detach in different order.  Let's just be safe.
107  */
108 struct ocfs2_live_connection {
109 	struct list_head		oc_list;
110 	struct ocfs2_cluster_connection	*oc_conn;
111 	enum ocfs2_connection_type	oc_type;
112 	atomic_t                        oc_this_node;
113 	int                             oc_our_slot;
114 	struct dlm_lksb                 oc_version_lksb;
115 	char                            oc_lvb[DLM_LVB_LEN];
116 	struct completion               oc_sync_wait;
117 	wait_queue_head_t		oc_wait;
118 };
119 
120 struct ocfs2_control_private {
121 	struct list_head op_list;
122 	int op_state;
123 	int op_this_node;
124 	struct ocfs2_protocol_version op_proto;
125 };
126 
127 /* SETN<space><8-char-hex-nodenum><newline> */
128 struct ocfs2_control_message_setn {
129 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
130 	char	space;
131 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
132 	char	newline;
133 };
134 
135 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */
136 struct ocfs2_control_message_setv {
137 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
138 	char	space1;
139 	char	major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
140 	char	space2;
141 	char	minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN];
142 	char	newline;
143 };
144 
145 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */
146 struct ocfs2_control_message_down {
147 	char	tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
148 	char	space1;
149 	char	uuid[OCFS2_TEXT_UUID_LEN];
150 	char	space2;
151 	char	nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN];
152 	char	newline;
153 };
154 
155 union ocfs2_control_message {
156 	char					tag[OCFS2_CONTROL_MESSAGE_OP_LEN];
157 	struct ocfs2_control_message_setn	u_setn;
158 	struct ocfs2_control_message_setv	u_setv;
159 	struct ocfs2_control_message_down	u_down;
160 };
161 
162 static struct ocfs2_stack_plugin ocfs2_user_plugin;
163 
164 static atomic_t ocfs2_control_opened;
165 static int ocfs2_control_this_node = -1;
166 static struct ocfs2_protocol_version running_proto;
167 
168 static LIST_HEAD(ocfs2_live_connection_list);
169 static LIST_HEAD(ocfs2_control_private_list);
170 static DEFINE_MUTEX(ocfs2_control_lock);
171 
172 static inline void ocfs2_control_set_handshake_state(struct file *file,
173 						     int state)
174 {
175 	struct ocfs2_control_private *p = file->private_data;
176 	p->op_state = state;
177 }
178 
179 static inline int ocfs2_control_get_handshake_state(struct file *file)
180 {
181 	struct ocfs2_control_private *p = file->private_data;
182 	return p->op_state;
183 }
184 
185 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name)
186 {
187 	size_t len = strlen(name);
188 	struct ocfs2_live_connection *c;
189 
190 	BUG_ON(!mutex_is_locked(&ocfs2_control_lock));
191 
192 	list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) {
193 		if ((c->oc_conn->cc_namelen == len) &&
194 		    !strncmp(c->oc_conn->cc_name, name, len))
195 			return c;
196 	}
197 
198 	return NULL;
199 }
200 
201 /*
202  * ocfs2_live_connection structures are created underneath the ocfs2
203  * mount path.  Since the VFS prevents multiple calls to
204  * fill_super(), we can't get dupes here.
205  */
206 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn,
207 				     struct ocfs2_live_connection *c)
208 {
209 	int rc = 0;
210 
211 	mutex_lock(&ocfs2_control_lock);
212 	c->oc_conn = conn;
213 
214 	if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened))
215 		list_add(&c->oc_list, &ocfs2_live_connection_list);
216 	else {
217 		printk(KERN_ERR
218 		       "ocfs2: Userspace control daemon is not present\n");
219 		rc = -ESRCH;
220 	}
221 
222 	mutex_unlock(&ocfs2_control_lock);
223 	return rc;
224 }
225 
226 /*
227  * This function disconnects the cluster connection from ocfs2_control.
228  * Afterwards, userspace can't affect the cluster connection.
229  */
230 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c)
231 {
232 	mutex_lock(&ocfs2_control_lock);
233 	list_del_init(&c->oc_list);
234 	c->oc_conn = NULL;
235 	mutex_unlock(&ocfs2_control_lock);
236 
237 	kfree(c);
238 }
239 
240 static int ocfs2_control_cfu(void *target, size_t target_len,
241 			     const char __user *buf, size_t count)
242 {
243 	/* The T01 expects write(2) calls to have exactly one command */
244 	if ((count != target_len) ||
245 	    (count > sizeof(union ocfs2_control_message)))
246 		return -EINVAL;
247 
248 	if (copy_from_user(target, buf, target_len))
249 		return -EFAULT;
250 
251 	return 0;
252 }
253 
254 static ssize_t ocfs2_control_validate_protocol(struct file *file,
255 					       const char __user *buf,
256 					       size_t count)
257 {
258 	ssize_t ret;
259 	char kbuf[OCFS2_CONTROL_PROTO_LEN];
260 
261 	ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN,
262 				buf, count);
263 	if (ret)
264 		return ret;
265 
266 	if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN))
267 		return -EINVAL;
268 
269 	ocfs2_control_set_handshake_state(file,
270 					  OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
271 
272 	return count;
273 }
274 
275 static void ocfs2_control_send_down(const char *uuid,
276 				    int nodenum)
277 {
278 	struct ocfs2_live_connection *c;
279 
280 	mutex_lock(&ocfs2_control_lock);
281 
282 	c = ocfs2_connection_find(uuid);
283 	if (c) {
284 		BUG_ON(c->oc_conn == NULL);
285 		c->oc_conn->cc_recovery_handler(nodenum,
286 						c->oc_conn->cc_recovery_data);
287 	}
288 
289 	mutex_unlock(&ocfs2_control_lock);
290 }
291 
292 /*
293  * Called whenever configuration elements are sent to /dev/ocfs2_control.
294  * If all configuration elements are present, try to set the global
295  * values.  If there is a problem, return an error.  Skip any missing
296  * elements, and only bump ocfs2_control_opened when we have all elements
297  * and are successful.
298  */
299 static int ocfs2_control_install_private(struct file *file)
300 {
301 	int rc = 0;
302 	int set_p = 1;
303 	struct ocfs2_control_private *p = file->private_data;
304 
305 	BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL);
306 
307 	mutex_lock(&ocfs2_control_lock);
308 
309 	if (p->op_this_node < 0) {
310 		set_p = 0;
311 	} else if ((ocfs2_control_this_node >= 0) &&
312 		   (ocfs2_control_this_node != p->op_this_node)) {
313 		rc = -EINVAL;
314 		goto out_unlock;
315 	}
316 
317 	if (!p->op_proto.pv_major) {
318 		set_p = 0;
319 	} else if (!list_empty(&ocfs2_live_connection_list) &&
320 		   ((running_proto.pv_major != p->op_proto.pv_major) ||
321 		    (running_proto.pv_minor != p->op_proto.pv_minor))) {
322 		rc = -EINVAL;
323 		goto out_unlock;
324 	}
325 
326 	if (set_p) {
327 		ocfs2_control_this_node = p->op_this_node;
328 		running_proto.pv_major = p->op_proto.pv_major;
329 		running_proto.pv_minor = p->op_proto.pv_minor;
330 		atomic_inc(&ocfs2_control_opened);
331 		ocfs2_control_set_handshake_state(file,
332 					OCFS2_CONTROL_HANDSHAKE_VALID);
333 	}
334 
335 out_unlock:
336 	mutex_unlock(&ocfs2_control_lock);
337 
338 	return rc;
339 }
340 
341 static int ocfs2_control_get_this_node(void)
342 {
343 	int rc;
344 
345 	mutex_lock(&ocfs2_control_lock);
346 	if (ocfs2_control_this_node < 0)
347 		rc = -EINVAL;
348 	else
349 		rc = ocfs2_control_this_node;
350 	mutex_unlock(&ocfs2_control_lock);
351 
352 	return rc;
353 }
354 
355 static int ocfs2_control_do_setnode_msg(struct file *file,
356 					struct ocfs2_control_message_setn *msg)
357 {
358 	long nodenum;
359 	struct ocfs2_control_private *p = file->private_data;
360 
361 	if (ocfs2_control_get_handshake_state(file) !=
362 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
363 		return -EINVAL;
364 
365 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
366 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
367 		return -EINVAL;
368 
369 	if ((msg->space != ' ') || (msg->newline != '\n'))
370 		return -EINVAL;
371 	msg->space = msg->newline = '\0';
372 
373 	if (kstrtol(msg->nodestr, 16, &nodenum))
374 		return -EINVAL;
375 
376 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
377 	    (nodenum > INT_MAX) || (nodenum < 0))
378 		return -ERANGE;
379 	p->op_this_node = nodenum;
380 
381 	return ocfs2_control_install_private(file);
382 }
383 
384 static int ocfs2_control_do_setversion_msg(struct file *file,
385 					   struct ocfs2_control_message_setv *msg)
386 {
387 	long major, minor;
388 	struct ocfs2_control_private *p = file->private_data;
389 	struct ocfs2_protocol_version *max =
390 		&ocfs2_user_plugin.sp_max_proto;
391 
392 	if (ocfs2_control_get_handshake_state(file) !=
393 	    OCFS2_CONTROL_HANDSHAKE_PROTOCOL)
394 		return -EINVAL;
395 
396 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
397 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
398 		return -EINVAL;
399 
400 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
401 	    (msg->newline != '\n'))
402 		return -EINVAL;
403 	msg->space1 = msg->space2 = msg->newline = '\0';
404 
405 	if (kstrtol(msg->major, 16, &major))
406 		return -EINVAL;
407 	if (kstrtol(msg->minor, 16, &minor))
408 		return -EINVAL;
409 
410 	/*
411 	 * The major must be between 1 and 255, inclusive.  The minor
412 	 * must be between 0 and 255, inclusive.  The version passed in
413 	 * must be within the maximum version supported by the filesystem.
414 	 */
415 	if ((major == LONG_MIN) || (major == LONG_MAX) ||
416 	    (major > (u8)-1) || (major < 1))
417 		return -ERANGE;
418 	if ((minor == LONG_MIN) || (minor == LONG_MAX) ||
419 	    (minor > (u8)-1) || (minor < 0))
420 		return -ERANGE;
421 	if ((major != max->pv_major) ||
422 	    (minor > max->pv_minor))
423 		return -EINVAL;
424 
425 	p->op_proto.pv_major = major;
426 	p->op_proto.pv_minor = minor;
427 
428 	return ocfs2_control_install_private(file);
429 }
430 
431 static int ocfs2_control_do_down_msg(struct file *file,
432 				     struct ocfs2_control_message_down *msg)
433 {
434 	long nodenum;
435 
436 	if (ocfs2_control_get_handshake_state(file) !=
437 	    OCFS2_CONTROL_HANDSHAKE_VALID)
438 		return -EINVAL;
439 
440 	if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
441 		    OCFS2_CONTROL_MESSAGE_OP_LEN))
442 		return -EINVAL;
443 
444 	if ((msg->space1 != ' ') || (msg->space2 != ' ') ||
445 	    (msg->newline != '\n'))
446 		return -EINVAL;
447 	msg->space1 = msg->space2 = msg->newline = '\0';
448 
449 	if (kstrtol(msg->nodestr, 16, &nodenum))
450 		return -EINVAL;
451 
452 	if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) ||
453 	    (nodenum > INT_MAX) || (nodenum < 0))
454 		return -ERANGE;
455 
456 	ocfs2_control_send_down(msg->uuid, nodenum);
457 
458 	return 0;
459 }
460 
461 static ssize_t ocfs2_control_message(struct file *file,
462 				     const char __user *buf,
463 				     size_t count)
464 {
465 	ssize_t ret;
466 	union ocfs2_control_message msg;
467 
468 	/* Try to catch padding issues */
469 	WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) !=
470 		(sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1)));
471 
472 	memset(&msg, 0, sizeof(union ocfs2_control_message));
473 	ret = ocfs2_control_cfu(&msg, count, buf, count);
474 	if (ret)
475 		goto out;
476 
477 	if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) &&
478 	    !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP,
479 		     OCFS2_CONTROL_MESSAGE_OP_LEN))
480 		ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn);
481 	else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) &&
482 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP,
483 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
484 		ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv);
485 	else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) &&
486 		 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP,
487 			  OCFS2_CONTROL_MESSAGE_OP_LEN))
488 		ret = ocfs2_control_do_down_msg(file, &msg.u_down);
489 	else
490 		ret = -EINVAL;
491 
492 out:
493 	return ret ? ret : count;
494 }
495 
496 static ssize_t ocfs2_control_write(struct file *file,
497 				   const char __user *buf,
498 				   size_t count,
499 				   loff_t *ppos)
500 {
501 	ssize_t ret;
502 
503 	switch (ocfs2_control_get_handshake_state(file)) {
504 		case OCFS2_CONTROL_HANDSHAKE_INVALID:
505 			ret = -EINVAL;
506 			break;
507 
508 		case OCFS2_CONTROL_HANDSHAKE_READ:
509 			ret = ocfs2_control_validate_protocol(file, buf,
510 							      count);
511 			break;
512 
513 		case OCFS2_CONTROL_HANDSHAKE_PROTOCOL:
514 		case OCFS2_CONTROL_HANDSHAKE_VALID:
515 			ret = ocfs2_control_message(file, buf, count);
516 			break;
517 
518 		default:
519 			BUG();
520 			ret = -EIO;
521 			break;
522 	}
523 
524 	return ret;
525 }
526 
527 /*
528  * This is a naive version.  If we ever have a new protocol, we'll expand
529  * it.  Probably using seq_file.
530  */
531 static ssize_t ocfs2_control_read(struct file *file,
532 				  char __user *buf,
533 				  size_t count,
534 				  loff_t *ppos)
535 {
536 	ssize_t ret;
537 
538 	ret = simple_read_from_buffer(buf, count, ppos,
539 			OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN);
540 
541 	/* Have we read the whole protocol list? */
542 	if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN)
543 		ocfs2_control_set_handshake_state(file,
544 						  OCFS2_CONTROL_HANDSHAKE_READ);
545 
546 	return ret;
547 }
548 
549 static int ocfs2_control_release(struct inode *inode, struct file *file)
550 {
551 	struct ocfs2_control_private *p = file->private_data;
552 
553 	mutex_lock(&ocfs2_control_lock);
554 
555 	if (ocfs2_control_get_handshake_state(file) !=
556 	    OCFS2_CONTROL_HANDSHAKE_VALID)
557 		goto out;
558 
559 	if (atomic_dec_and_test(&ocfs2_control_opened)) {
560 		if (!list_empty(&ocfs2_live_connection_list)) {
561 			/* XXX: Do bad things! */
562 			printk(KERN_ERR
563 			       "ocfs2: Unexpected release of ocfs2_control!\n"
564 			       "       Loss of cluster connection requires "
565 			       "an emergency restart!\n");
566 			emergency_restart();
567 		}
568 		/*
569 		 * Last valid close clears the node number and resets
570 		 * the locking protocol version
571 		 */
572 		ocfs2_control_this_node = -1;
573 		running_proto.pv_major = 0;
574 		running_proto.pv_minor = 0;
575 	}
576 
577 out:
578 	list_del_init(&p->op_list);
579 	file->private_data = NULL;
580 
581 	mutex_unlock(&ocfs2_control_lock);
582 
583 	kfree(p);
584 
585 	return 0;
586 }
587 
588 static int ocfs2_control_open(struct inode *inode, struct file *file)
589 {
590 	struct ocfs2_control_private *p;
591 
592 	p = kzalloc_obj(struct ocfs2_control_private);
593 	if (!p)
594 		return -ENOMEM;
595 	p->op_this_node = -1;
596 
597 	mutex_lock(&ocfs2_control_lock);
598 	file->private_data = p;
599 	list_add(&p->op_list, &ocfs2_control_private_list);
600 	mutex_unlock(&ocfs2_control_lock);
601 
602 	return 0;
603 }
604 
605 static const struct file_operations ocfs2_control_fops = {
606 	.open    = ocfs2_control_open,
607 	.release = ocfs2_control_release,
608 	.read    = ocfs2_control_read,
609 	.write   = ocfs2_control_write,
610 	.owner   = THIS_MODULE,
611 	.llseek  = default_llseek,
612 };
613 
614 static struct miscdevice ocfs2_control_device = {
615 	.minor		= MISC_DYNAMIC_MINOR,
616 	.name		= "ocfs2_control",
617 	.fops		= &ocfs2_control_fops,
618 };
619 
620 static int ocfs2_control_init(void)
621 {
622 	int rc;
623 
624 	atomic_set(&ocfs2_control_opened, 0);
625 
626 	rc = misc_register(&ocfs2_control_device);
627 	if (rc)
628 		printk(KERN_ERR
629 		       "ocfs2: Unable to register ocfs2_control device "
630 		       "(errno %d)\n",
631 		       -rc);
632 
633 	return rc;
634 }
635 
636 static void ocfs2_control_exit(void)
637 {
638 	misc_deregister(&ocfs2_control_device);
639 }
640 
641 static void fsdlm_lock_ast_wrapper(void *astarg)
642 {
643 	struct ocfs2_dlm_lksb *lksb = astarg;
644 	int status = lksb->lksb_fsdlm.sb_status;
645 
646 	/*
647 	 * For now we're punting on the issue of other non-standard errors
648 	 * where we can't tell if the unlock_ast or lock_ast should be called.
649 	 * The main "other error" that's possible is EINVAL which means the
650 	 * function was called with invalid args, which shouldn't be possible
651 	 * since the caller here is under our control.  Other non-standard
652 	 * errors probably fall into the same category, or otherwise are fatal
653 	 * which means we can't carry on anyway.
654 	 */
655 
656 	if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL)
657 		lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0);
658 	else
659 		lksb->lksb_conn->cc_proto->lp_lock_ast(lksb);
660 }
661 
662 static void fsdlm_blocking_ast_wrapper(void *astarg, int level)
663 {
664 	struct ocfs2_dlm_lksb *lksb = astarg;
665 
666 	lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level);
667 }
668 
669 static int user_dlm_lock(struct ocfs2_cluster_connection *conn,
670 			 int mode,
671 			 struct ocfs2_dlm_lksb *lksb,
672 			 u32 flags,
673 			 void *name,
674 			 unsigned int namelen)
675 {
676 	if (!lksb->lksb_fsdlm.sb_lvbptr)
677 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
678 					     sizeof(struct dlm_lksb);
679 
680 	return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm,
681 			flags|DLM_LKF_NODLCKWT, name, namelen, 0,
682 			fsdlm_lock_ast_wrapper, lksb,
683 			fsdlm_blocking_ast_wrapper);
684 }
685 
686 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn,
687 			   struct ocfs2_dlm_lksb *lksb,
688 			   u32 flags)
689 {
690 	return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid,
691 			  flags, &lksb->lksb_fsdlm, lksb);
692 }
693 
694 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb)
695 {
696 	return lksb->lksb_fsdlm.sb_status;
697 }
698 
699 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb)
700 {
701 	int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID;
702 
703 	return !invalid;
704 }
705 
706 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb)
707 {
708 	if (!lksb->lksb_fsdlm.sb_lvbptr)
709 		lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb +
710 					     sizeof(struct dlm_lksb);
711 	return (void *)(lksb->lksb_fsdlm.sb_lvbptr);
712 }
713 
714 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb)
715 {
716 }
717 
718 static int user_plock(struct ocfs2_cluster_connection *conn,
719 		      u64 ino,
720 		      struct file *file,
721 		      int cmd,
722 		      struct file_lock *fl)
723 {
724 	/*
725 	 * This more or less just demuxes the plock request into any
726 	 * one of three dlm calls.
727 	 *
728 	 * Internally, fs/dlm will pass these to a misc device, which
729 	 * a userspace daemon will read and write to.
730 	 */
731 
732 	if (cmd == F_CANCELLK)
733 		return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl);
734 	else if (IS_GETLK(cmd))
735 		return dlm_posix_get(conn->cc_lockspace, ino, file, fl);
736 	else if (lock_is_unlock(fl))
737 		return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl);
738 	else
739 		return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl);
740 }
741 
742 /*
743  * Compare a requested locking protocol version against the current one.
744  *
745  * If the major numbers are different, they are incompatible.
746  * If the current minor is greater than the request, they are incompatible.
747  * If the current minor is less than or equal to the request, they are
748  * compatible, and the requester should run at the current minor version.
749  */
750 static int fs_protocol_compare(struct ocfs2_protocol_version *existing,
751 			       struct ocfs2_protocol_version *request)
752 {
753 	if (existing->pv_major != request->pv_major)
754 		return 1;
755 
756 	if (existing->pv_minor > request->pv_minor)
757 		return 1;
758 
759 	if (existing->pv_minor < request->pv_minor)
760 		request->pv_minor = existing->pv_minor;
761 
762 	return 0;
763 }
764 
765 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver)
766 {
767 	struct ocfs2_protocol_version *pv =
768 		(struct ocfs2_protocol_version *)lvb;
769 	/*
770 	 * ocfs2_protocol_version has two u8 variables, so we don't
771 	 * need any endian conversion.
772 	 */
773 	ver->pv_major = pv->pv_major;
774 	ver->pv_minor = pv->pv_minor;
775 }
776 
777 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb)
778 {
779 	struct ocfs2_protocol_version *pv =
780 		(struct ocfs2_protocol_version *)lvb;
781 	/*
782 	 * ocfs2_protocol_version has two u8 variables, so we don't
783 	 * need any endian conversion.
784 	 */
785 	pv->pv_major = ver->pv_major;
786 	pv->pv_minor = ver->pv_minor;
787 }
788 
789 static void sync_wait_cb(void *arg)
790 {
791 	struct ocfs2_cluster_connection *conn = arg;
792 	struct ocfs2_live_connection *lc = conn->cc_private;
793 	complete(&lc->oc_sync_wait);
794 }
795 
796 static int sync_unlock(struct ocfs2_cluster_connection *conn,
797 		struct dlm_lksb *lksb, char *name)
798 {
799 	int error;
800 	struct ocfs2_live_connection *lc = conn->cc_private;
801 
802 	error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn);
803 	if (error) {
804 		printk(KERN_ERR "%s lkid %x error %d\n",
805 				name, lksb->sb_lkid, error);
806 		return error;
807 	}
808 
809 	wait_for_completion(&lc->oc_sync_wait);
810 
811 	if (lksb->sb_status != -DLM_EUNLOCK) {
812 		printk(KERN_ERR "%s lkid %x status %d\n",
813 				name, lksb->sb_lkid, lksb->sb_status);
814 		return -1;
815 	}
816 	return 0;
817 }
818 
819 static int sync_lock(struct ocfs2_cluster_connection *conn,
820 		int mode, uint32_t flags,
821 		struct dlm_lksb *lksb, char *name)
822 {
823 	int error, status;
824 	struct ocfs2_live_connection *lc = conn->cc_private;
825 
826 	error = dlm_lock(conn->cc_lockspace, mode, lksb, flags,
827 			name, strlen(name),
828 			0, sync_wait_cb, conn, NULL);
829 	if (error) {
830 		printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n",
831 				name, lksb->sb_lkid, flags, mode, error);
832 		return error;
833 	}
834 
835 	wait_for_completion(&lc->oc_sync_wait);
836 
837 	status = lksb->sb_status;
838 
839 	if (status && status != -EAGAIN) {
840 		printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n",
841 				name, lksb->sb_lkid, flags, mode, status);
842 	}
843 
844 	return status;
845 }
846 
847 
848 static int version_lock(struct ocfs2_cluster_connection *conn, int mode,
849 		int flags)
850 {
851 	struct ocfs2_live_connection *lc = conn->cc_private;
852 	return sync_lock(conn, mode, flags,
853 			&lc->oc_version_lksb, VERSION_LOCK);
854 }
855 
856 static int version_unlock(struct ocfs2_cluster_connection *conn)
857 {
858 	struct ocfs2_live_connection *lc = conn->cc_private;
859 	return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK);
860 }
861 
862 /* get_protocol_version()
863  *
864  * To exchange ocfs2 versioning, we use the LVB of the version dlm lock.
865  * The algorithm is:
866  * 1. Attempt to take the lock in EX mode (non-blocking).
867  * 2. If successful (which means it is the first mount), write the
868  *    version number and downconvert to PR lock.
869  * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after
870  *    taking the PR lock.
871  */
872 
873 static int get_protocol_version(struct ocfs2_cluster_connection *conn)
874 {
875 	int ret;
876 	struct ocfs2_live_connection *lc = conn->cc_private;
877 	struct ocfs2_protocol_version pv;
878 
879 	running_proto.pv_major =
880 		ocfs2_user_plugin.sp_max_proto.pv_major;
881 	running_proto.pv_minor =
882 		ocfs2_user_plugin.sp_max_proto.pv_minor;
883 
884 	lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb;
885 	ret = version_lock(conn, DLM_LOCK_EX,
886 			DLM_LKF_VALBLK|DLM_LKF_NOQUEUE);
887 	if (!ret) {
888 		conn->cc_version.pv_major = running_proto.pv_major;
889 		conn->cc_version.pv_minor = running_proto.pv_minor;
890 		version_to_lvb(&running_proto, lc->oc_lvb);
891 		version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK);
892 	} else if (ret == -EAGAIN) {
893 		ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK);
894 		if (ret)
895 			goto out;
896 		lvb_to_version(lc->oc_lvb, &pv);
897 
898 		if ((pv.pv_major != running_proto.pv_major) ||
899 				(pv.pv_minor > running_proto.pv_minor)) {
900 			ret = -EINVAL;
901 			goto out;
902 		}
903 
904 		conn->cc_version.pv_major = pv.pv_major;
905 		conn->cc_version.pv_minor = pv.pv_minor;
906 	}
907 out:
908 	return ret;
909 }
910 
911 static void user_recover_prep(void *arg)
912 {
913 }
914 
915 static void user_recover_slot(void *arg, struct dlm_slot *slot)
916 {
917 	struct ocfs2_cluster_connection *conn = arg;
918 	printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n",
919 			slot->nodeid, slot->slot);
920 	conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data);
921 
922 }
923 
924 static void user_recover_done(void *arg, struct dlm_slot *slots,
925 		int num_slots, int our_slot,
926 		uint32_t generation)
927 {
928 	struct ocfs2_cluster_connection *conn = arg;
929 	struct ocfs2_live_connection *lc = conn->cc_private;
930 	int i;
931 
932 	for (i = 0; i < num_slots; i++)
933 		if (slots[i].slot == our_slot) {
934 			atomic_set(&lc->oc_this_node, slots[i].nodeid);
935 			break;
936 		}
937 
938 	lc->oc_our_slot = our_slot;
939 	wake_up(&lc->oc_wait);
940 }
941 
942 static const struct dlm_lockspace_ops ocfs2_ls_ops = {
943 	.recover_prep = user_recover_prep,
944 	.recover_slot = user_recover_slot,
945 	.recover_done = user_recover_done,
946 };
947 
948 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn)
949 {
950 	version_unlock(conn);
951 	dlm_release_lockspace(conn->cc_lockspace, DLM_RELEASE_NORMAL);
952 	conn->cc_lockspace = NULL;
953 	ocfs2_live_connection_drop(conn->cc_private);
954 	conn->cc_private = NULL;
955 	return 0;
956 }
957 
958 static int user_cluster_connect(struct ocfs2_cluster_connection *conn)
959 {
960 	dlm_lockspace_t *fsdlm;
961 	struct ocfs2_live_connection *lc;
962 	int rc, ops_rv;
963 
964 	BUG_ON(conn == NULL);
965 
966 	lc = kzalloc_obj(struct ocfs2_live_connection);
967 	if (!lc)
968 		return -ENOMEM;
969 
970 	init_waitqueue_head(&lc->oc_wait);
971 	init_completion(&lc->oc_sync_wait);
972 	atomic_set(&lc->oc_this_node, 0);
973 	conn->cc_private = lc;
974 	lc->oc_type = NO_CONTROLD;
975 
976 	rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name,
977 			       DLM_LSFL_NEWEXCL, DLM_LVB_LEN,
978 			       &ocfs2_ls_ops, conn, &ops_rv, &fsdlm);
979 	if (rc) {
980 		if (rc == -EEXIST || rc == -EPROTO)
981 			printk(KERN_ERR "ocfs2: Unable to create the "
982 				"lockspace %s (%d), because a ocfs2-tools "
983 				"program is running on this file system "
984 				"with the same name lockspace\n",
985 				conn->cc_name, rc);
986 		goto out;
987 	}
988 
989 	if (ops_rv == -EOPNOTSUPP) {
990 		lc->oc_type = WITH_CONTROLD;
991 		printk(KERN_NOTICE "ocfs2: You seem to be using an older "
992 				"version of dlm_controld and/or ocfs2-tools."
993 				" Please consider upgrading.\n");
994 	} else if (ops_rv) {
995 		rc = ops_rv;
996 		goto out;
997 	}
998 	conn->cc_lockspace = fsdlm;
999 
1000 	rc = ocfs2_live_connection_attach(conn, lc);
1001 	if (rc)
1002 		goto out;
1003 
1004 	if (lc->oc_type == NO_CONTROLD) {
1005 		rc = get_protocol_version(conn);
1006 		if (rc) {
1007 			printk(KERN_ERR "ocfs2: Could not determine"
1008 					" locking version\n");
1009 			user_cluster_disconnect(conn);
1010 			lc = NULL;
1011 			goto out;
1012 		}
1013 		wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0));
1014 	}
1015 
1016 	/*
1017 	 * running_proto must have been set before we allowed any mounts
1018 	 * to proceed.
1019 	 */
1020 	if (fs_protocol_compare(&running_proto, &conn->cc_version)) {
1021 		printk(KERN_ERR
1022 		       "Unable to mount with fs locking protocol version "
1023 		       "%u.%u because negotiated protocol is %u.%u\n",
1024 		       conn->cc_version.pv_major, conn->cc_version.pv_minor,
1025 		       running_proto.pv_major, running_proto.pv_minor);
1026 		rc = -EPROTO;
1027 		ocfs2_live_connection_drop(lc);
1028 		lc = NULL;
1029 	}
1030 
1031 out:
1032 	if (rc)
1033 		kfree(lc);
1034 	return rc;
1035 }
1036 
1037 
1038 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn,
1039 				  unsigned int *this_node)
1040 {
1041 	int rc;
1042 	struct ocfs2_live_connection *lc = conn->cc_private;
1043 
1044 	if (lc->oc_type == WITH_CONTROLD)
1045 		rc = ocfs2_control_get_this_node();
1046 	else if (lc->oc_type == NO_CONTROLD)
1047 		rc = atomic_read(&lc->oc_this_node);
1048 	else
1049 		rc = -EINVAL;
1050 
1051 	if (rc < 0)
1052 		return rc;
1053 
1054 	*this_node = rc;
1055 	return 0;
1056 }
1057 
1058 static const struct ocfs2_stack_operations ocfs2_user_plugin_ops = {
1059 	.connect	= user_cluster_connect,
1060 	.disconnect	= user_cluster_disconnect,
1061 	.this_node	= user_cluster_this_node,
1062 	.dlm_lock	= user_dlm_lock,
1063 	.dlm_unlock	= user_dlm_unlock,
1064 	.lock_status	= user_dlm_lock_status,
1065 	.lvb_valid	= user_dlm_lvb_valid,
1066 	.lock_lvb	= user_dlm_lvb,
1067 	.plock		= user_plock,
1068 	.dump_lksb	= user_dlm_dump_lksb,
1069 };
1070 
1071 static struct ocfs2_stack_plugin ocfs2_user_plugin = {
1072 	.sp_name	= "user",
1073 	.sp_ops		= &ocfs2_user_plugin_ops,
1074 	.sp_owner	= THIS_MODULE,
1075 };
1076 
1077 
1078 static int __init ocfs2_user_plugin_init(void)
1079 {
1080 	int rc;
1081 
1082 	rc = ocfs2_control_init();
1083 	if (!rc) {
1084 		rc = ocfs2_stack_glue_register(&ocfs2_user_plugin);
1085 		if (rc)
1086 			ocfs2_control_exit();
1087 	}
1088 
1089 	return rc;
1090 }
1091 
1092 static void __exit ocfs2_user_plugin_exit(void)
1093 {
1094 	ocfs2_stack_glue_unregister(&ocfs2_user_plugin);
1095 	ocfs2_control_exit();
1096 }
1097 
1098 MODULE_AUTHOR("Oracle");
1099 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks");
1100 MODULE_LICENSE("GPL");
1101 module_init(ocfs2_user_plugin_init);
1102 module_exit(ocfs2_user_plugin_exit);
1103