1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * stack_user.c 4 * 5 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 6 * 7 * Copyright (C) 2007 Oracle. All rights reserved. 8 */ 9 10 #include <linux/module.h> 11 #include <linux/fs.h> 12 #include <linux/filelock.h> 13 #include <linux/miscdevice.h> 14 #include <linux/mutex.h> 15 #include <linux/slab.h> 16 #include <linux/reboot.h> 17 #include <linux/sched.h> 18 #include <linux/uaccess.h> 19 20 #include "stackglue.h" 21 22 #include <linux/dlm_plock.h> 23 24 /* 25 * The control protocol starts with a handshake. Until the handshake 26 * is complete, the control device will fail all write(2)s. 27 * 28 * The handshake is simple. First, the client reads until EOF. Each line 29 * of output is a supported protocol tag. All protocol tags are a single 30 * character followed by a two hex digit version number. Currently the 31 * only things supported is T01, for "Text-base version 0x01". Next, the 32 * client writes the version they would like to use, including the newline. 33 * Thus, the protocol tag is 'T01\n'. If the version tag written is 34 * unknown, -EINVAL is returned. Once the negotiation is complete, the 35 * client can start sending messages. 36 * 37 * The T01 protocol has three messages. First is the "SETN" message. 38 * It has the following syntax: 39 * 40 * SETN<space><8-char-hex-nodenum><newline> 41 * 42 * This is 14 characters. 43 * 44 * The "SETN" message must be the first message following the protocol. 45 * It tells ocfs2_control the local node number. 46 * 47 * Next comes the "SETV" message. It has the following syntax: 48 * 49 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 50 * 51 * This is 11 characters. 52 * 53 * The "SETV" message sets the filesystem locking protocol version as 54 * negotiated by the client. The client negotiates based on the maximum 55 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 56 * number from the "SETV" message must match 57 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 58 * must be less than or equal to ...sp_max_version.pv_minor. 59 * 60 * Once this information has been set, mounts will be allowed. From this 61 * point on, the "DOWN" message can be sent for node down notification. 62 * It has the following syntax: 63 * 64 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 65 * 66 * eg: 67 * 68 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 69 * 70 * This is 47 characters. 71 */ 72 73 /* 74 * Whether or not the client has done the handshake. 75 * For now, we have just one protocol version. 76 */ 77 #define OCFS2_CONTROL_PROTO "T01\n" 78 #define OCFS2_CONTROL_PROTO_LEN 4 79 80 /* Handshake states */ 81 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 82 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 83 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 84 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 85 86 /* Messages */ 87 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 88 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 89 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 92 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 93 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 94 #define OCFS2_TEXT_UUID_LEN 32 95 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 96 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 97 #define VERSION_LOCK "version_lock" 98 99 enum ocfs2_connection_type { 100 WITH_CONTROLD, 101 NO_CONTROLD 102 }; 103 104 /* 105 * ocfs2_live_connection is refcounted because the filesystem and 106 * miscdevice sides can detach in different order. Let's just be safe. 107 */ 108 struct ocfs2_live_connection { 109 struct list_head oc_list; 110 struct ocfs2_cluster_connection *oc_conn; 111 enum ocfs2_connection_type oc_type; 112 atomic_t oc_this_node; 113 int oc_our_slot; 114 struct dlm_lksb oc_version_lksb; 115 char oc_lvb[DLM_LVB_LEN]; 116 struct completion oc_sync_wait; 117 wait_queue_head_t oc_wait; 118 }; 119 120 struct ocfs2_control_private { 121 struct list_head op_list; 122 int op_state; 123 int op_this_node; 124 struct ocfs2_protocol_version op_proto; 125 }; 126 127 /* SETN<space><8-char-hex-nodenum><newline> */ 128 struct ocfs2_control_message_setn { 129 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 130 char space; 131 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 132 char newline; 133 }; 134 135 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 136 struct ocfs2_control_message_setv { 137 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 138 char space1; 139 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 140 char space2; 141 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 142 char newline; 143 }; 144 145 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 146 struct ocfs2_control_message_down { 147 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 148 char space1; 149 char uuid[OCFS2_TEXT_UUID_LEN]; 150 char space2; 151 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 152 char newline; 153 }; 154 155 union ocfs2_control_message { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 struct ocfs2_control_message_setn u_setn; 158 struct ocfs2_control_message_setv u_setv; 159 struct ocfs2_control_message_down u_down; 160 }; 161 162 static struct ocfs2_stack_plugin ocfs2_user_plugin; 163 164 static atomic_t ocfs2_control_opened; 165 static int ocfs2_control_this_node = -1; 166 static struct ocfs2_protocol_version running_proto; 167 168 static LIST_HEAD(ocfs2_live_connection_list); 169 static LIST_HEAD(ocfs2_control_private_list); 170 static DEFINE_MUTEX(ocfs2_control_lock); 171 172 static inline void ocfs2_control_set_handshake_state(struct file *file, 173 int state) 174 { 175 struct ocfs2_control_private *p = file->private_data; 176 p->op_state = state; 177 } 178 179 static inline int ocfs2_control_get_handshake_state(struct file *file) 180 { 181 struct ocfs2_control_private *p = file->private_data; 182 return p->op_state; 183 } 184 185 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 186 { 187 size_t len = strlen(name); 188 struct ocfs2_live_connection *c; 189 190 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 191 192 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 193 if ((c->oc_conn->cc_namelen == len) && 194 !strncmp(c->oc_conn->cc_name, name, len)) 195 return c; 196 } 197 198 return NULL; 199 } 200 201 /* 202 * ocfs2_live_connection structures are created underneath the ocfs2 203 * mount path. Since the VFS prevents multiple calls to 204 * fill_super(), we can't get dupes here. 205 */ 206 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 207 struct ocfs2_live_connection *c) 208 { 209 int rc = 0; 210 211 mutex_lock(&ocfs2_control_lock); 212 c->oc_conn = conn; 213 214 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 215 list_add(&c->oc_list, &ocfs2_live_connection_list); 216 else { 217 printk(KERN_ERR 218 "ocfs2: Userspace control daemon is not present\n"); 219 rc = -ESRCH; 220 } 221 222 mutex_unlock(&ocfs2_control_lock); 223 return rc; 224 } 225 226 /* 227 * This function disconnects the cluster connection from ocfs2_control. 228 * Afterwards, userspace can't affect the cluster connection. 229 */ 230 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 231 { 232 mutex_lock(&ocfs2_control_lock); 233 list_del_init(&c->oc_list); 234 c->oc_conn = NULL; 235 mutex_unlock(&ocfs2_control_lock); 236 237 kfree(c); 238 } 239 240 static int ocfs2_control_cfu(void *target, size_t target_len, 241 const char __user *buf, size_t count) 242 { 243 /* The T01 expects write(2) calls to have exactly one command */ 244 if ((count != target_len) || 245 (count > sizeof(union ocfs2_control_message))) 246 return -EINVAL; 247 248 if (copy_from_user(target, buf, target_len)) 249 return -EFAULT; 250 251 return 0; 252 } 253 254 static ssize_t ocfs2_control_validate_protocol(struct file *file, 255 const char __user *buf, 256 size_t count) 257 { 258 ssize_t ret; 259 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 260 261 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 262 buf, count); 263 if (ret) 264 return ret; 265 266 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 267 return -EINVAL; 268 269 ocfs2_control_set_handshake_state(file, 270 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 271 272 return count; 273 } 274 275 static void ocfs2_control_send_down(const char *uuid, 276 int nodenum) 277 { 278 struct ocfs2_live_connection *c; 279 280 mutex_lock(&ocfs2_control_lock); 281 282 c = ocfs2_connection_find(uuid); 283 if (c) { 284 BUG_ON(c->oc_conn == NULL); 285 c->oc_conn->cc_recovery_handler(nodenum, 286 c->oc_conn->cc_recovery_data); 287 } 288 289 mutex_unlock(&ocfs2_control_lock); 290 } 291 292 /* 293 * Called whenever configuration elements are sent to /dev/ocfs2_control. 294 * If all configuration elements are present, try to set the global 295 * values. If there is a problem, return an error. Skip any missing 296 * elements, and only bump ocfs2_control_opened when we have all elements 297 * and are successful. 298 */ 299 static int ocfs2_control_install_private(struct file *file) 300 { 301 int rc = 0; 302 int set_p = 1; 303 struct ocfs2_control_private *p = file->private_data; 304 305 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 306 307 mutex_lock(&ocfs2_control_lock); 308 309 if (p->op_this_node < 0) { 310 set_p = 0; 311 } else if ((ocfs2_control_this_node >= 0) && 312 (ocfs2_control_this_node != p->op_this_node)) { 313 rc = -EINVAL; 314 goto out_unlock; 315 } 316 317 if (!p->op_proto.pv_major) { 318 set_p = 0; 319 } else if (!list_empty(&ocfs2_live_connection_list) && 320 ((running_proto.pv_major != p->op_proto.pv_major) || 321 (running_proto.pv_minor != p->op_proto.pv_minor))) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (set_p) { 327 ocfs2_control_this_node = p->op_this_node; 328 running_proto.pv_major = p->op_proto.pv_major; 329 running_proto.pv_minor = p->op_proto.pv_minor; 330 } 331 332 out_unlock: 333 mutex_unlock(&ocfs2_control_lock); 334 335 if (!rc && set_p) { 336 /* We set the global values successfully */ 337 atomic_inc(&ocfs2_control_opened); 338 ocfs2_control_set_handshake_state(file, 339 OCFS2_CONTROL_HANDSHAKE_VALID); 340 } 341 342 return rc; 343 } 344 345 static int ocfs2_control_get_this_node(void) 346 { 347 int rc; 348 349 mutex_lock(&ocfs2_control_lock); 350 if (ocfs2_control_this_node < 0) 351 rc = -EINVAL; 352 else 353 rc = ocfs2_control_this_node; 354 mutex_unlock(&ocfs2_control_lock); 355 356 return rc; 357 } 358 359 static int ocfs2_control_do_setnode_msg(struct file *file, 360 struct ocfs2_control_message_setn *msg) 361 { 362 long nodenum; 363 char *ptr = NULL; 364 struct ocfs2_control_private *p = file->private_data; 365 366 if (ocfs2_control_get_handshake_state(file) != 367 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 368 return -EINVAL; 369 370 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 371 OCFS2_CONTROL_MESSAGE_OP_LEN)) 372 return -EINVAL; 373 374 if ((msg->space != ' ') || (msg->newline != '\n')) 375 return -EINVAL; 376 msg->space = msg->newline = '\0'; 377 378 nodenum = simple_strtol(msg->nodestr, &ptr, 16); 379 if (!ptr || *ptr) 380 return -EINVAL; 381 382 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 383 (nodenum > INT_MAX) || (nodenum < 0)) 384 return -ERANGE; 385 p->op_this_node = nodenum; 386 387 return ocfs2_control_install_private(file); 388 } 389 390 static int ocfs2_control_do_setversion_msg(struct file *file, 391 struct ocfs2_control_message_setv *msg) 392 { 393 long major, minor; 394 char *ptr = NULL; 395 struct ocfs2_control_private *p = file->private_data; 396 struct ocfs2_protocol_version *max = 397 &ocfs2_user_plugin.sp_max_proto; 398 399 if (ocfs2_control_get_handshake_state(file) != 400 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 401 return -EINVAL; 402 403 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 404 OCFS2_CONTROL_MESSAGE_OP_LEN)) 405 return -EINVAL; 406 407 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 408 (msg->newline != '\n')) 409 return -EINVAL; 410 msg->space1 = msg->space2 = msg->newline = '\0'; 411 412 major = simple_strtol(msg->major, &ptr, 16); 413 if (!ptr || *ptr) 414 return -EINVAL; 415 minor = simple_strtol(msg->minor, &ptr, 16); 416 if (!ptr || *ptr) 417 return -EINVAL; 418 419 /* 420 * The major must be between 1 and 255, inclusive. The minor 421 * must be between 0 and 255, inclusive. The version passed in 422 * must be within the maximum version supported by the filesystem. 423 */ 424 if ((major == LONG_MIN) || (major == LONG_MAX) || 425 (major > (u8)-1) || (major < 1)) 426 return -ERANGE; 427 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 428 (minor > (u8)-1) || (minor < 0)) 429 return -ERANGE; 430 if ((major != max->pv_major) || 431 (minor > max->pv_minor)) 432 return -EINVAL; 433 434 p->op_proto.pv_major = major; 435 p->op_proto.pv_minor = minor; 436 437 return ocfs2_control_install_private(file); 438 } 439 440 static int ocfs2_control_do_down_msg(struct file *file, 441 struct ocfs2_control_message_down *msg) 442 { 443 long nodenum; 444 char *p = NULL; 445 446 if (ocfs2_control_get_handshake_state(file) != 447 OCFS2_CONTROL_HANDSHAKE_VALID) 448 return -EINVAL; 449 450 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 451 OCFS2_CONTROL_MESSAGE_OP_LEN)) 452 return -EINVAL; 453 454 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 455 (msg->newline != '\n')) 456 return -EINVAL; 457 msg->space1 = msg->space2 = msg->newline = '\0'; 458 459 nodenum = simple_strtol(msg->nodestr, &p, 16); 460 if (!p || *p) 461 return -EINVAL; 462 463 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 464 (nodenum > INT_MAX) || (nodenum < 0)) 465 return -ERANGE; 466 467 ocfs2_control_send_down(msg->uuid, nodenum); 468 469 return 0; 470 } 471 472 static ssize_t ocfs2_control_message(struct file *file, 473 const char __user *buf, 474 size_t count) 475 { 476 ssize_t ret; 477 union ocfs2_control_message msg; 478 479 /* Try to catch padding issues */ 480 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 481 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 482 483 memset(&msg, 0, sizeof(union ocfs2_control_message)); 484 ret = ocfs2_control_cfu(&msg, count, buf, count); 485 if (ret) 486 goto out; 487 488 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 489 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 490 OCFS2_CONTROL_MESSAGE_OP_LEN)) 491 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 492 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 493 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 494 OCFS2_CONTROL_MESSAGE_OP_LEN)) 495 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 496 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 497 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 498 OCFS2_CONTROL_MESSAGE_OP_LEN)) 499 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 500 else 501 ret = -EINVAL; 502 503 out: 504 return ret ? ret : count; 505 } 506 507 static ssize_t ocfs2_control_write(struct file *file, 508 const char __user *buf, 509 size_t count, 510 loff_t *ppos) 511 { 512 ssize_t ret; 513 514 switch (ocfs2_control_get_handshake_state(file)) { 515 case OCFS2_CONTROL_HANDSHAKE_INVALID: 516 ret = -EINVAL; 517 break; 518 519 case OCFS2_CONTROL_HANDSHAKE_READ: 520 ret = ocfs2_control_validate_protocol(file, buf, 521 count); 522 break; 523 524 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 525 case OCFS2_CONTROL_HANDSHAKE_VALID: 526 ret = ocfs2_control_message(file, buf, count); 527 break; 528 529 default: 530 BUG(); 531 ret = -EIO; 532 break; 533 } 534 535 return ret; 536 } 537 538 /* 539 * This is a naive version. If we ever have a new protocol, we'll expand 540 * it. Probably using seq_file. 541 */ 542 static ssize_t ocfs2_control_read(struct file *file, 543 char __user *buf, 544 size_t count, 545 loff_t *ppos) 546 { 547 ssize_t ret; 548 549 ret = simple_read_from_buffer(buf, count, ppos, 550 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 551 552 /* Have we read the whole protocol list? */ 553 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 554 ocfs2_control_set_handshake_state(file, 555 OCFS2_CONTROL_HANDSHAKE_READ); 556 557 return ret; 558 } 559 560 static int ocfs2_control_release(struct inode *inode, struct file *file) 561 { 562 struct ocfs2_control_private *p = file->private_data; 563 564 mutex_lock(&ocfs2_control_lock); 565 566 if (ocfs2_control_get_handshake_state(file) != 567 OCFS2_CONTROL_HANDSHAKE_VALID) 568 goto out; 569 570 if (atomic_dec_and_test(&ocfs2_control_opened)) { 571 if (!list_empty(&ocfs2_live_connection_list)) { 572 /* XXX: Do bad things! */ 573 printk(KERN_ERR 574 "ocfs2: Unexpected release of ocfs2_control!\n" 575 " Loss of cluster connection requires " 576 "an emergency restart!\n"); 577 emergency_restart(); 578 } 579 /* 580 * Last valid close clears the node number and resets 581 * the locking protocol version 582 */ 583 ocfs2_control_this_node = -1; 584 running_proto.pv_major = 0; 585 running_proto.pv_minor = 0; 586 } 587 588 out: 589 list_del_init(&p->op_list); 590 file->private_data = NULL; 591 592 mutex_unlock(&ocfs2_control_lock); 593 594 kfree(p); 595 596 return 0; 597 } 598 599 static int ocfs2_control_open(struct inode *inode, struct file *file) 600 { 601 struct ocfs2_control_private *p; 602 603 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 604 if (!p) 605 return -ENOMEM; 606 p->op_this_node = -1; 607 608 mutex_lock(&ocfs2_control_lock); 609 file->private_data = p; 610 list_add(&p->op_list, &ocfs2_control_private_list); 611 mutex_unlock(&ocfs2_control_lock); 612 613 return 0; 614 } 615 616 static const struct file_operations ocfs2_control_fops = { 617 .open = ocfs2_control_open, 618 .release = ocfs2_control_release, 619 .read = ocfs2_control_read, 620 .write = ocfs2_control_write, 621 .owner = THIS_MODULE, 622 .llseek = default_llseek, 623 }; 624 625 static struct miscdevice ocfs2_control_device = { 626 .minor = MISC_DYNAMIC_MINOR, 627 .name = "ocfs2_control", 628 .fops = &ocfs2_control_fops, 629 }; 630 631 static int ocfs2_control_init(void) 632 { 633 int rc; 634 635 atomic_set(&ocfs2_control_opened, 0); 636 637 rc = misc_register(&ocfs2_control_device); 638 if (rc) 639 printk(KERN_ERR 640 "ocfs2: Unable to register ocfs2_control device " 641 "(errno %d)\n", 642 -rc); 643 644 return rc; 645 } 646 647 static void ocfs2_control_exit(void) 648 { 649 misc_deregister(&ocfs2_control_device); 650 } 651 652 static void fsdlm_lock_ast_wrapper(void *astarg) 653 { 654 struct ocfs2_dlm_lksb *lksb = astarg; 655 int status = lksb->lksb_fsdlm.sb_status; 656 657 /* 658 * For now we're punting on the issue of other non-standard errors 659 * where we can't tell if the unlock_ast or lock_ast should be called. 660 * The main "other error" that's possible is EINVAL which means the 661 * function was called with invalid args, which shouldn't be possible 662 * since the caller here is under our control. Other non-standard 663 * errors probably fall into the same category, or otherwise are fatal 664 * which means we can't carry on anyway. 665 */ 666 667 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 668 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 669 else 670 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 671 } 672 673 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 674 { 675 struct ocfs2_dlm_lksb *lksb = astarg; 676 677 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 678 } 679 680 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 681 int mode, 682 struct ocfs2_dlm_lksb *lksb, 683 u32 flags, 684 void *name, 685 unsigned int namelen) 686 { 687 if (!lksb->lksb_fsdlm.sb_lvbptr) 688 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 689 sizeof(struct dlm_lksb); 690 691 return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 692 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 693 fsdlm_lock_ast_wrapper, lksb, 694 fsdlm_blocking_ast_wrapper); 695 } 696 697 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 698 struct ocfs2_dlm_lksb *lksb, 699 u32 flags) 700 { 701 return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 702 flags, &lksb->lksb_fsdlm, lksb); 703 } 704 705 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 706 { 707 return lksb->lksb_fsdlm.sb_status; 708 } 709 710 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 711 { 712 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 713 714 return !invalid; 715 } 716 717 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 718 { 719 if (!lksb->lksb_fsdlm.sb_lvbptr) 720 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 721 sizeof(struct dlm_lksb); 722 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 723 } 724 725 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 726 { 727 } 728 729 static int user_plock(struct ocfs2_cluster_connection *conn, 730 u64 ino, 731 struct file *file, 732 int cmd, 733 struct file_lock *fl) 734 { 735 /* 736 * This more or less just demuxes the plock request into any 737 * one of three dlm calls. 738 * 739 * Internally, fs/dlm will pass these to a misc device, which 740 * a userspace daemon will read and write to. 741 */ 742 743 if (cmd == F_CANCELLK) 744 return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl); 745 else if (IS_GETLK(cmd)) 746 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 747 else if (lock_is_unlock(fl)) 748 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 749 else 750 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 751 } 752 753 /* 754 * Compare a requested locking protocol version against the current one. 755 * 756 * If the major numbers are different, they are incompatible. 757 * If the current minor is greater than the request, they are incompatible. 758 * If the current minor is less than or equal to the request, they are 759 * compatible, and the requester should run at the current minor version. 760 */ 761 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 762 struct ocfs2_protocol_version *request) 763 { 764 if (existing->pv_major != request->pv_major) 765 return 1; 766 767 if (existing->pv_minor > request->pv_minor) 768 return 1; 769 770 if (existing->pv_minor < request->pv_minor) 771 request->pv_minor = existing->pv_minor; 772 773 return 0; 774 } 775 776 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 777 { 778 struct ocfs2_protocol_version *pv = 779 (struct ocfs2_protocol_version *)lvb; 780 /* 781 * ocfs2_protocol_version has two u8 variables, so we don't 782 * need any endian conversion. 783 */ 784 ver->pv_major = pv->pv_major; 785 ver->pv_minor = pv->pv_minor; 786 } 787 788 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 789 { 790 struct ocfs2_protocol_version *pv = 791 (struct ocfs2_protocol_version *)lvb; 792 /* 793 * ocfs2_protocol_version has two u8 variables, so we don't 794 * need any endian conversion. 795 */ 796 pv->pv_major = ver->pv_major; 797 pv->pv_minor = ver->pv_minor; 798 } 799 800 static void sync_wait_cb(void *arg) 801 { 802 struct ocfs2_cluster_connection *conn = arg; 803 struct ocfs2_live_connection *lc = conn->cc_private; 804 complete(&lc->oc_sync_wait); 805 } 806 807 static int sync_unlock(struct ocfs2_cluster_connection *conn, 808 struct dlm_lksb *lksb, char *name) 809 { 810 int error; 811 struct ocfs2_live_connection *lc = conn->cc_private; 812 813 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 814 if (error) { 815 printk(KERN_ERR "%s lkid %x error %d\n", 816 name, lksb->sb_lkid, error); 817 return error; 818 } 819 820 wait_for_completion(&lc->oc_sync_wait); 821 822 if (lksb->sb_status != -DLM_EUNLOCK) { 823 printk(KERN_ERR "%s lkid %x status %d\n", 824 name, lksb->sb_lkid, lksb->sb_status); 825 return -1; 826 } 827 return 0; 828 } 829 830 static int sync_lock(struct ocfs2_cluster_connection *conn, 831 int mode, uint32_t flags, 832 struct dlm_lksb *lksb, char *name) 833 { 834 int error, status; 835 struct ocfs2_live_connection *lc = conn->cc_private; 836 837 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 838 name, strlen(name), 839 0, sync_wait_cb, conn, NULL); 840 if (error) { 841 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 842 name, lksb->sb_lkid, flags, mode, error); 843 return error; 844 } 845 846 wait_for_completion(&lc->oc_sync_wait); 847 848 status = lksb->sb_status; 849 850 if (status && status != -EAGAIN) { 851 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 852 name, lksb->sb_lkid, flags, mode, status); 853 } 854 855 return status; 856 } 857 858 859 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 860 int flags) 861 { 862 struct ocfs2_live_connection *lc = conn->cc_private; 863 return sync_lock(conn, mode, flags, 864 &lc->oc_version_lksb, VERSION_LOCK); 865 } 866 867 static int version_unlock(struct ocfs2_cluster_connection *conn) 868 { 869 struct ocfs2_live_connection *lc = conn->cc_private; 870 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 871 } 872 873 /* get_protocol_version() 874 * 875 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 876 * The algorithm is: 877 * 1. Attempt to take the lock in EX mode (non-blocking). 878 * 2. If successful (which means it is the first mount), write the 879 * version number and downconvert to PR lock. 880 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 881 * taking the PR lock. 882 */ 883 884 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 885 { 886 int ret; 887 struct ocfs2_live_connection *lc = conn->cc_private; 888 struct ocfs2_protocol_version pv; 889 890 running_proto.pv_major = 891 ocfs2_user_plugin.sp_max_proto.pv_major; 892 running_proto.pv_minor = 893 ocfs2_user_plugin.sp_max_proto.pv_minor; 894 895 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 896 ret = version_lock(conn, DLM_LOCK_EX, 897 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 898 if (!ret) { 899 conn->cc_version.pv_major = running_proto.pv_major; 900 conn->cc_version.pv_minor = running_proto.pv_minor; 901 version_to_lvb(&running_proto, lc->oc_lvb); 902 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 903 } else if (ret == -EAGAIN) { 904 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 905 if (ret) 906 goto out; 907 lvb_to_version(lc->oc_lvb, &pv); 908 909 if ((pv.pv_major != running_proto.pv_major) || 910 (pv.pv_minor > running_proto.pv_minor)) { 911 ret = -EINVAL; 912 goto out; 913 } 914 915 conn->cc_version.pv_major = pv.pv_major; 916 conn->cc_version.pv_minor = pv.pv_minor; 917 } 918 out: 919 return ret; 920 } 921 922 static void user_recover_prep(void *arg) 923 { 924 } 925 926 static void user_recover_slot(void *arg, struct dlm_slot *slot) 927 { 928 struct ocfs2_cluster_connection *conn = arg; 929 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 930 slot->nodeid, slot->slot); 931 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 932 933 } 934 935 static void user_recover_done(void *arg, struct dlm_slot *slots, 936 int num_slots, int our_slot, 937 uint32_t generation) 938 { 939 struct ocfs2_cluster_connection *conn = arg; 940 struct ocfs2_live_connection *lc = conn->cc_private; 941 int i; 942 943 for (i = 0; i < num_slots; i++) 944 if (slots[i].slot == our_slot) { 945 atomic_set(&lc->oc_this_node, slots[i].nodeid); 946 break; 947 } 948 949 lc->oc_our_slot = our_slot; 950 wake_up(&lc->oc_wait); 951 } 952 953 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 954 .recover_prep = user_recover_prep, 955 .recover_slot = user_recover_slot, 956 .recover_done = user_recover_done, 957 }; 958 959 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 960 { 961 version_unlock(conn); 962 dlm_release_lockspace(conn->cc_lockspace, 2); 963 conn->cc_lockspace = NULL; 964 ocfs2_live_connection_drop(conn->cc_private); 965 conn->cc_private = NULL; 966 return 0; 967 } 968 969 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 970 { 971 dlm_lockspace_t *fsdlm; 972 struct ocfs2_live_connection *lc; 973 int rc, ops_rv; 974 975 BUG_ON(conn == NULL); 976 977 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 978 if (!lc) 979 return -ENOMEM; 980 981 init_waitqueue_head(&lc->oc_wait); 982 init_completion(&lc->oc_sync_wait); 983 atomic_set(&lc->oc_this_node, 0); 984 conn->cc_private = lc; 985 lc->oc_type = NO_CONTROLD; 986 987 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 988 DLM_LSFL_NEWEXCL, DLM_LVB_LEN, 989 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 990 if (rc) { 991 if (rc == -EEXIST || rc == -EPROTO) 992 printk(KERN_ERR "ocfs2: Unable to create the " 993 "lockspace %s (%d), because a ocfs2-tools " 994 "program is running on this file system " 995 "with the same name lockspace\n", 996 conn->cc_name, rc); 997 goto out; 998 } 999 1000 if (ops_rv == -EOPNOTSUPP) { 1001 lc->oc_type = WITH_CONTROLD; 1002 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 1003 "version of dlm_controld and/or ocfs2-tools." 1004 " Please consider upgrading.\n"); 1005 } else if (ops_rv) { 1006 rc = ops_rv; 1007 goto out; 1008 } 1009 conn->cc_lockspace = fsdlm; 1010 1011 rc = ocfs2_live_connection_attach(conn, lc); 1012 if (rc) 1013 goto out; 1014 1015 if (lc->oc_type == NO_CONTROLD) { 1016 rc = get_protocol_version(conn); 1017 if (rc) { 1018 printk(KERN_ERR "ocfs2: Could not determine" 1019 " locking version\n"); 1020 user_cluster_disconnect(conn); 1021 goto out; 1022 } 1023 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1024 } 1025 1026 /* 1027 * running_proto must have been set before we allowed any mounts 1028 * to proceed. 1029 */ 1030 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1031 printk(KERN_ERR 1032 "Unable to mount with fs locking protocol version " 1033 "%u.%u because negotiated protocol is %u.%u\n", 1034 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1035 running_proto.pv_major, running_proto.pv_minor); 1036 rc = -EPROTO; 1037 ocfs2_live_connection_drop(lc); 1038 lc = NULL; 1039 } 1040 1041 out: 1042 if (rc) 1043 kfree(lc); 1044 return rc; 1045 } 1046 1047 1048 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1049 unsigned int *this_node) 1050 { 1051 int rc; 1052 struct ocfs2_live_connection *lc = conn->cc_private; 1053 1054 if (lc->oc_type == WITH_CONTROLD) 1055 rc = ocfs2_control_get_this_node(); 1056 else if (lc->oc_type == NO_CONTROLD) 1057 rc = atomic_read(&lc->oc_this_node); 1058 else 1059 rc = -EINVAL; 1060 1061 if (rc < 0) 1062 return rc; 1063 1064 *this_node = rc; 1065 return 0; 1066 } 1067 1068 static struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1069 .connect = user_cluster_connect, 1070 .disconnect = user_cluster_disconnect, 1071 .this_node = user_cluster_this_node, 1072 .dlm_lock = user_dlm_lock, 1073 .dlm_unlock = user_dlm_unlock, 1074 .lock_status = user_dlm_lock_status, 1075 .lvb_valid = user_dlm_lvb_valid, 1076 .lock_lvb = user_dlm_lvb, 1077 .plock = user_plock, 1078 .dump_lksb = user_dlm_dump_lksb, 1079 }; 1080 1081 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1082 .sp_name = "user", 1083 .sp_ops = &ocfs2_user_plugin_ops, 1084 .sp_owner = THIS_MODULE, 1085 }; 1086 1087 1088 static int __init ocfs2_user_plugin_init(void) 1089 { 1090 int rc; 1091 1092 rc = ocfs2_control_init(); 1093 if (!rc) { 1094 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1095 if (rc) 1096 ocfs2_control_exit(); 1097 } 1098 1099 return rc; 1100 } 1101 1102 static void __exit ocfs2_user_plugin_exit(void) 1103 { 1104 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1105 ocfs2_control_exit(); 1106 } 1107 1108 MODULE_AUTHOR("Oracle"); 1109 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1110 MODULE_LICENSE("GPL"); 1111 module_init(ocfs2_user_plugin_init); 1112 module_exit(ocfs2_user_plugin_exit); 1113