1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * stack_user.c 4 * 5 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 6 * 7 * Copyright (C) 2007 Oracle. All rights reserved. 8 */ 9 10 #include <linux/module.h> 11 #include <linux/fs.h> 12 #include <linux/filelock.h> 13 #include <linux/miscdevice.h> 14 #include <linux/mutex.h> 15 #include <linux/slab.h> 16 #include <linux/reboot.h> 17 #include <linux/sched.h> 18 #include <linux/uaccess.h> 19 20 #include "stackglue.h" 21 22 #include <linux/dlm_plock.h> 23 24 /* 25 * The control protocol starts with a handshake. Until the handshake 26 * is complete, the control device will fail all write(2)s. 27 * 28 * The handshake is simple. First, the client reads until EOF. Each line 29 * of output is a supported protocol tag. All protocol tags are a single 30 * character followed by a two hex digit version number. Currently the 31 * only things supported is T01, for "Text-base version 0x01". Next, the 32 * client writes the version they would like to use, including the newline. 33 * Thus, the protocol tag is 'T01\n'. If the version tag written is 34 * unknown, -EINVAL is returned. Once the negotiation is complete, the 35 * client can start sending messages. 36 * 37 * The T01 protocol has three messages. First is the "SETN" message. 38 * It has the following syntax: 39 * 40 * SETN<space><8-char-hex-nodenum><newline> 41 * 42 * This is 14 characters. 43 * 44 * The "SETN" message must be the first message following the protocol. 45 * It tells ocfs2_control the local node number. 46 * 47 * Next comes the "SETV" message. It has the following syntax: 48 * 49 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 50 * 51 * This is 11 characters. 52 * 53 * The "SETV" message sets the filesystem locking protocol version as 54 * negotiated by the client. The client negotiates based on the maximum 55 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 56 * number from the "SETV" message must match 57 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 58 * must be less than or equal to ...sp_max_version.pv_minor. 59 * 60 * Once this information has been set, mounts will be allowed. From this 61 * point on, the "DOWN" message can be sent for node down notification. 62 * It has the following syntax: 63 * 64 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 65 * 66 * eg: 67 * 68 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 69 * 70 * This is 47 characters. 71 */ 72 73 /* 74 * Whether or not the client has done the handshake. 75 * For now, we have just one protocol version. 76 */ 77 #define OCFS2_CONTROL_PROTO "T01\n" 78 #define OCFS2_CONTROL_PROTO_LEN 4 79 80 /* Handshake states */ 81 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 82 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 83 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 84 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 85 86 /* Messages */ 87 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 88 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 89 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 92 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 93 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 94 #define OCFS2_TEXT_UUID_LEN 32 95 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 96 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 97 #define VERSION_LOCK "version_lock" 98 99 enum ocfs2_connection_type { 100 WITH_CONTROLD, 101 NO_CONTROLD 102 }; 103 104 /* 105 * ocfs2_live_connection is refcounted because the filesystem and 106 * miscdevice sides can detach in different order. Let's just be safe. 107 */ 108 struct ocfs2_live_connection { 109 struct list_head oc_list; 110 struct ocfs2_cluster_connection *oc_conn; 111 enum ocfs2_connection_type oc_type; 112 atomic_t oc_this_node; 113 int oc_our_slot; 114 struct dlm_lksb oc_version_lksb; 115 char oc_lvb[DLM_LVB_LEN]; 116 struct completion oc_sync_wait; 117 wait_queue_head_t oc_wait; 118 }; 119 120 struct ocfs2_control_private { 121 struct list_head op_list; 122 int op_state; 123 int op_this_node; 124 struct ocfs2_protocol_version op_proto; 125 }; 126 127 /* SETN<space><8-char-hex-nodenum><newline> */ 128 struct ocfs2_control_message_setn { 129 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 130 char space; 131 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 132 char newline; 133 }; 134 135 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 136 struct ocfs2_control_message_setv { 137 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 138 char space1; 139 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 140 char space2; 141 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 142 char newline; 143 }; 144 145 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 146 struct ocfs2_control_message_down { 147 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 148 char space1; 149 char uuid[OCFS2_TEXT_UUID_LEN]; 150 char space2; 151 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 152 char newline; 153 }; 154 155 union ocfs2_control_message { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 struct ocfs2_control_message_setn u_setn; 158 struct ocfs2_control_message_setv u_setv; 159 struct ocfs2_control_message_down u_down; 160 }; 161 162 static struct ocfs2_stack_plugin ocfs2_user_plugin; 163 164 static atomic_t ocfs2_control_opened; 165 static int ocfs2_control_this_node = -1; 166 static struct ocfs2_protocol_version running_proto; 167 168 static LIST_HEAD(ocfs2_live_connection_list); 169 static LIST_HEAD(ocfs2_control_private_list); 170 static DEFINE_MUTEX(ocfs2_control_lock); 171 172 static inline void ocfs2_control_set_handshake_state(struct file *file, 173 int state) 174 { 175 struct ocfs2_control_private *p = file->private_data; 176 p->op_state = state; 177 } 178 179 static inline int ocfs2_control_get_handshake_state(struct file *file) 180 { 181 struct ocfs2_control_private *p = file->private_data; 182 return p->op_state; 183 } 184 185 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 186 { 187 size_t len = strlen(name); 188 struct ocfs2_live_connection *c; 189 190 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 191 192 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 193 if ((c->oc_conn->cc_namelen == len) && 194 !strncmp(c->oc_conn->cc_name, name, len)) 195 return c; 196 } 197 198 return NULL; 199 } 200 201 /* 202 * ocfs2_live_connection structures are created underneath the ocfs2 203 * mount path. Since the VFS prevents multiple calls to 204 * fill_super(), we can't get dupes here. 205 */ 206 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 207 struct ocfs2_live_connection *c) 208 { 209 int rc = 0; 210 211 mutex_lock(&ocfs2_control_lock); 212 c->oc_conn = conn; 213 214 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 215 list_add(&c->oc_list, &ocfs2_live_connection_list); 216 else { 217 printk(KERN_ERR 218 "ocfs2: Userspace control daemon is not present\n"); 219 rc = -ESRCH; 220 } 221 222 mutex_unlock(&ocfs2_control_lock); 223 return rc; 224 } 225 226 /* 227 * This function disconnects the cluster connection from ocfs2_control. 228 * Afterwards, userspace can't affect the cluster connection. 229 */ 230 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 231 { 232 mutex_lock(&ocfs2_control_lock); 233 list_del_init(&c->oc_list); 234 c->oc_conn = NULL; 235 mutex_unlock(&ocfs2_control_lock); 236 237 kfree(c); 238 } 239 240 static int ocfs2_control_cfu(void *target, size_t target_len, 241 const char __user *buf, size_t count) 242 { 243 /* The T01 expects write(2) calls to have exactly one command */ 244 if ((count != target_len) || 245 (count > sizeof(union ocfs2_control_message))) 246 return -EINVAL; 247 248 if (copy_from_user(target, buf, target_len)) 249 return -EFAULT; 250 251 return 0; 252 } 253 254 static ssize_t ocfs2_control_validate_protocol(struct file *file, 255 const char __user *buf, 256 size_t count) 257 { 258 ssize_t ret; 259 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 260 261 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 262 buf, count); 263 if (ret) 264 return ret; 265 266 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 267 return -EINVAL; 268 269 ocfs2_control_set_handshake_state(file, 270 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 271 272 return count; 273 } 274 275 static void ocfs2_control_send_down(const char *uuid, 276 int nodenum) 277 { 278 struct ocfs2_live_connection *c; 279 280 mutex_lock(&ocfs2_control_lock); 281 282 c = ocfs2_connection_find(uuid); 283 if (c) { 284 BUG_ON(c->oc_conn == NULL); 285 c->oc_conn->cc_recovery_handler(nodenum, 286 c->oc_conn->cc_recovery_data); 287 } 288 289 mutex_unlock(&ocfs2_control_lock); 290 } 291 292 /* 293 * Called whenever configuration elements are sent to /dev/ocfs2_control. 294 * If all configuration elements are present, try to set the global 295 * values. If there is a problem, return an error. Skip any missing 296 * elements, and only bump ocfs2_control_opened when we have all elements 297 * and are successful. 298 */ 299 static int ocfs2_control_install_private(struct file *file) 300 { 301 int rc = 0; 302 int set_p = 1; 303 struct ocfs2_control_private *p = file->private_data; 304 305 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 306 307 mutex_lock(&ocfs2_control_lock); 308 309 if (p->op_this_node < 0) { 310 set_p = 0; 311 } else if ((ocfs2_control_this_node >= 0) && 312 (ocfs2_control_this_node != p->op_this_node)) { 313 rc = -EINVAL; 314 goto out_unlock; 315 } 316 317 if (!p->op_proto.pv_major) { 318 set_p = 0; 319 } else if (!list_empty(&ocfs2_live_connection_list) && 320 ((running_proto.pv_major != p->op_proto.pv_major) || 321 (running_proto.pv_minor != p->op_proto.pv_minor))) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (set_p) { 327 ocfs2_control_this_node = p->op_this_node; 328 running_proto.pv_major = p->op_proto.pv_major; 329 running_proto.pv_minor = p->op_proto.pv_minor; 330 } 331 332 out_unlock: 333 mutex_unlock(&ocfs2_control_lock); 334 335 if (!rc && set_p) { 336 /* We set the global values successfully */ 337 atomic_inc(&ocfs2_control_opened); 338 ocfs2_control_set_handshake_state(file, 339 OCFS2_CONTROL_HANDSHAKE_VALID); 340 } 341 342 return rc; 343 } 344 345 static int ocfs2_control_get_this_node(void) 346 { 347 int rc; 348 349 mutex_lock(&ocfs2_control_lock); 350 if (ocfs2_control_this_node < 0) 351 rc = -EINVAL; 352 else 353 rc = ocfs2_control_this_node; 354 mutex_unlock(&ocfs2_control_lock); 355 356 return rc; 357 } 358 359 static int ocfs2_control_do_setnode_msg(struct file *file, 360 struct ocfs2_control_message_setn *msg) 361 { 362 long nodenum; 363 struct ocfs2_control_private *p = file->private_data; 364 365 if (ocfs2_control_get_handshake_state(file) != 366 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 367 return -EINVAL; 368 369 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 370 OCFS2_CONTROL_MESSAGE_OP_LEN)) 371 return -EINVAL; 372 373 if ((msg->space != ' ') || (msg->newline != '\n')) 374 return -EINVAL; 375 msg->space = msg->newline = '\0'; 376 377 if (kstrtol(msg->nodestr, 16, &nodenum)) 378 return -EINVAL; 379 380 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 381 (nodenum > INT_MAX) || (nodenum < 0)) 382 return -ERANGE; 383 p->op_this_node = nodenum; 384 385 return ocfs2_control_install_private(file); 386 } 387 388 static int ocfs2_control_do_setversion_msg(struct file *file, 389 struct ocfs2_control_message_setv *msg) 390 { 391 long major, minor; 392 struct ocfs2_control_private *p = file->private_data; 393 struct ocfs2_protocol_version *max = 394 &ocfs2_user_plugin.sp_max_proto; 395 396 if (ocfs2_control_get_handshake_state(file) != 397 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 398 return -EINVAL; 399 400 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 401 OCFS2_CONTROL_MESSAGE_OP_LEN)) 402 return -EINVAL; 403 404 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 405 (msg->newline != '\n')) 406 return -EINVAL; 407 msg->space1 = msg->space2 = msg->newline = '\0'; 408 409 if (kstrtol(msg->major, 16, &major)) 410 return -EINVAL; 411 if (kstrtol(msg->minor, 16, &minor)) 412 return -EINVAL; 413 414 /* 415 * The major must be between 1 and 255, inclusive. The minor 416 * must be between 0 and 255, inclusive. The version passed in 417 * must be within the maximum version supported by the filesystem. 418 */ 419 if ((major == LONG_MIN) || (major == LONG_MAX) || 420 (major > (u8)-1) || (major < 1)) 421 return -ERANGE; 422 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 423 (minor > (u8)-1) || (minor < 0)) 424 return -ERANGE; 425 if ((major != max->pv_major) || 426 (minor > max->pv_minor)) 427 return -EINVAL; 428 429 p->op_proto.pv_major = major; 430 p->op_proto.pv_minor = minor; 431 432 return ocfs2_control_install_private(file); 433 } 434 435 static int ocfs2_control_do_down_msg(struct file *file, 436 struct ocfs2_control_message_down *msg) 437 { 438 long nodenum; 439 440 if (ocfs2_control_get_handshake_state(file) != 441 OCFS2_CONTROL_HANDSHAKE_VALID) 442 return -EINVAL; 443 444 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 445 OCFS2_CONTROL_MESSAGE_OP_LEN)) 446 return -EINVAL; 447 448 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 449 (msg->newline != '\n')) 450 return -EINVAL; 451 msg->space1 = msg->space2 = msg->newline = '\0'; 452 453 if (kstrtol(msg->nodestr, 16, &nodenum)) 454 return -EINVAL; 455 456 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 457 (nodenum > INT_MAX) || (nodenum < 0)) 458 return -ERANGE; 459 460 ocfs2_control_send_down(msg->uuid, nodenum); 461 462 return 0; 463 } 464 465 static ssize_t ocfs2_control_message(struct file *file, 466 const char __user *buf, 467 size_t count) 468 { 469 ssize_t ret; 470 union ocfs2_control_message msg; 471 472 /* Try to catch padding issues */ 473 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 474 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 475 476 memset(&msg, 0, sizeof(union ocfs2_control_message)); 477 ret = ocfs2_control_cfu(&msg, count, buf, count); 478 if (ret) 479 goto out; 480 481 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 482 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 483 OCFS2_CONTROL_MESSAGE_OP_LEN)) 484 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 485 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 486 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 487 OCFS2_CONTROL_MESSAGE_OP_LEN)) 488 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 489 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 490 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 491 OCFS2_CONTROL_MESSAGE_OP_LEN)) 492 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 493 else 494 ret = -EINVAL; 495 496 out: 497 return ret ? ret : count; 498 } 499 500 static ssize_t ocfs2_control_write(struct file *file, 501 const char __user *buf, 502 size_t count, 503 loff_t *ppos) 504 { 505 ssize_t ret; 506 507 switch (ocfs2_control_get_handshake_state(file)) { 508 case OCFS2_CONTROL_HANDSHAKE_INVALID: 509 ret = -EINVAL; 510 break; 511 512 case OCFS2_CONTROL_HANDSHAKE_READ: 513 ret = ocfs2_control_validate_protocol(file, buf, 514 count); 515 break; 516 517 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 518 case OCFS2_CONTROL_HANDSHAKE_VALID: 519 ret = ocfs2_control_message(file, buf, count); 520 break; 521 522 default: 523 BUG(); 524 ret = -EIO; 525 break; 526 } 527 528 return ret; 529 } 530 531 /* 532 * This is a naive version. If we ever have a new protocol, we'll expand 533 * it. Probably using seq_file. 534 */ 535 static ssize_t ocfs2_control_read(struct file *file, 536 char __user *buf, 537 size_t count, 538 loff_t *ppos) 539 { 540 ssize_t ret; 541 542 ret = simple_read_from_buffer(buf, count, ppos, 543 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 544 545 /* Have we read the whole protocol list? */ 546 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 547 ocfs2_control_set_handshake_state(file, 548 OCFS2_CONTROL_HANDSHAKE_READ); 549 550 return ret; 551 } 552 553 static int ocfs2_control_release(struct inode *inode, struct file *file) 554 { 555 struct ocfs2_control_private *p = file->private_data; 556 557 mutex_lock(&ocfs2_control_lock); 558 559 if (ocfs2_control_get_handshake_state(file) != 560 OCFS2_CONTROL_HANDSHAKE_VALID) 561 goto out; 562 563 if (atomic_dec_and_test(&ocfs2_control_opened)) { 564 if (!list_empty(&ocfs2_live_connection_list)) { 565 /* XXX: Do bad things! */ 566 printk(KERN_ERR 567 "ocfs2: Unexpected release of ocfs2_control!\n" 568 " Loss of cluster connection requires " 569 "an emergency restart!\n"); 570 emergency_restart(); 571 } 572 /* 573 * Last valid close clears the node number and resets 574 * the locking protocol version 575 */ 576 ocfs2_control_this_node = -1; 577 running_proto.pv_major = 0; 578 running_proto.pv_minor = 0; 579 } 580 581 out: 582 list_del_init(&p->op_list); 583 file->private_data = NULL; 584 585 mutex_unlock(&ocfs2_control_lock); 586 587 kfree(p); 588 589 return 0; 590 } 591 592 static int ocfs2_control_open(struct inode *inode, struct file *file) 593 { 594 struct ocfs2_control_private *p; 595 596 p = kzalloc(sizeof(struct ocfs2_control_private), GFP_KERNEL); 597 if (!p) 598 return -ENOMEM; 599 p->op_this_node = -1; 600 601 mutex_lock(&ocfs2_control_lock); 602 file->private_data = p; 603 list_add(&p->op_list, &ocfs2_control_private_list); 604 mutex_unlock(&ocfs2_control_lock); 605 606 return 0; 607 } 608 609 static const struct file_operations ocfs2_control_fops = { 610 .open = ocfs2_control_open, 611 .release = ocfs2_control_release, 612 .read = ocfs2_control_read, 613 .write = ocfs2_control_write, 614 .owner = THIS_MODULE, 615 .llseek = default_llseek, 616 }; 617 618 static struct miscdevice ocfs2_control_device = { 619 .minor = MISC_DYNAMIC_MINOR, 620 .name = "ocfs2_control", 621 .fops = &ocfs2_control_fops, 622 }; 623 624 static int ocfs2_control_init(void) 625 { 626 int rc; 627 628 atomic_set(&ocfs2_control_opened, 0); 629 630 rc = misc_register(&ocfs2_control_device); 631 if (rc) 632 printk(KERN_ERR 633 "ocfs2: Unable to register ocfs2_control device " 634 "(errno %d)\n", 635 -rc); 636 637 return rc; 638 } 639 640 static void ocfs2_control_exit(void) 641 { 642 misc_deregister(&ocfs2_control_device); 643 } 644 645 static void fsdlm_lock_ast_wrapper(void *astarg) 646 { 647 struct ocfs2_dlm_lksb *lksb = astarg; 648 int status = lksb->lksb_fsdlm.sb_status; 649 650 /* 651 * For now we're punting on the issue of other non-standard errors 652 * where we can't tell if the unlock_ast or lock_ast should be called. 653 * The main "other error" that's possible is EINVAL which means the 654 * function was called with invalid args, which shouldn't be possible 655 * since the caller here is under our control. Other non-standard 656 * errors probably fall into the same category, or otherwise are fatal 657 * which means we can't carry on anyway. 658 */ 659 660 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 661 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 662 else 663 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 664 } 665 666 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 667 { 668 struct ocfs2_dlm_lksb *lksb = astarg; 669 670 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 671 } 672 673 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 674 int mode, 675 struct ocfs2_dlm_lksb *lksb, 676 u32 flags, 677 void *name, 678 unsigned int namelen) 679 { 680 if (!lksb->lksb_fsdlm.sb_lvbptr) 681 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 682 sizeof(struct dlm_lksb); 683 684 return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 685 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 686 fsdlm_lock_ast_wrapper, lksb, 687 fsdlm_blocking_ast_wrapper); 688 } 689 690 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 691 struct ocfs2_dlm_lksb *lksb, 692 u32 flags) 693 { 694 return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 695 flags, &lksb->lksb_fsdlm, lksb); 696 } 697 698 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 699 { 700 return lksb->lksb_fsdlm.sb_status; 701 } 702 703 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 704 { 705 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 706 707 return !invalid; 708 } 709 710 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 711 { 712 if (!lksb->lksb_fsdlm.sb_lvbptr) 713 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 714 sizeof(struct dlm_lksb); 715 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 716 } 717 718 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 719 { 720 } 721 722 static int user_plock(struct ocfs2_cluster_connection *conn, 723 u64 ino, 724 struct file *file, 725 int cmd, 726 struct file_lock *fl) 727 { 728 /* 729 * This more or less just demuxes the plock request into any 730 * one of three dlm calls. 731 * 732 * Internally, fs/dlm will pass these to a misc device, which 733 * a userspace daemon will read and write to. 734 */ 735 736 if (cmd == F_CANCELLK) 737 return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl); 738 else if (IS_GETLK(cmd)) 739 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 740 else if (lock_is_unlock(fl)) 741 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 742 else 743 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 744 } 745 746 /* 747 * Compare a requested locking protocol version against the current one. 748 * 749 * If the major numbers are different, they are incompatible. 750 * If the current minor is greater than the request, they are incompatible. 751 * If the current minor is less than or equal to the request, they are 752 * compatible, and the requester should run at the current minor version. 753 */ 754 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 755 struct ocfs2_protocol_version *request) 756 { 757 if (existing->pv_major != request->pv_major) 758 return 1; 759 760 if (existing->pv_minor > request->pv_minor) 761 return 1; 762 763 if (existing->pv_minor < request->pv_minor) 764 request->pv_minor = existing->pv_minor; 765 766 return 0; 767 } 768 769 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 770 { 771 struct ocfs2_protocol_version *pv = 772 (struct ocfs2_protocol_version *)lvb; 773 /* 774 * ocfs2_protocol_version has two u8 variables, so we don't 775 * need any endian conversion. 776 */ 777 ver->pv_major = pv->pv_major; 778 ver->pv_minor = pv->pv_minor; 779 } 780 781 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 782 { 783 struct ocfs2_protocol_version *pv = 784 (struct ocfs2_protocol_version *)lvb; 785 /* 786 * ocfs2_protocol_version has two u8 variables, so we don't 787 * need any endian conversion. 788 */ 789 pv->pv_major = ver->pv_major; 790 pv->pv_minor = ver->pv_minor; 791 } 792 793 static void sync_wait_cb(void *arg) 794 { 795 struct ocfs2_cluster_connection *conn = arg; 796 struct ocfs2_live_connection *lc = conn->cc_private; 797 complete(&lc->oc_sync_wait); 798 } 799 800 static int sync_unlock(struct ocfs2_cluster_connection *conn, 801 struct dlm_lksb *lksb, char *name) 802 { 803 int error; 804 struct ocfs2_live_connection *lc = conn->cc_private; 805 806 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 807 if (error) { 808 printk(KERN_ERR "%s lkid %x error %d\n", 809 name, lksb->sb_lkid, error); 810 return error; 811 } 812 813 wait_for_completion(&lc->oc_sync_wait); 814 815 if (lksb->sb_status != -DLM_EUNLOCK) { 816 printk(KERN_ERR "%s lkid %x status %d\n", 817 name, lksb->sb_lkid, lksb->sb_status); 818 return -1; 819 } 820 return 0; 821 } 822 823 static int sync_lock(struct ocfs2_cluster_connection *conn, 824 int mode, uint32_t flags, 825 struct dlm_lksb *lksb, char *name) 826 { 827 int error, status; 828 struct ocfs2_live_connection *lc = conn->cc_private; 829 830 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 831 name, strlen(name), 832 0, sync_wait_cb, conn, NULL); 833 if (error) { 834 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 835 name, lksb->sb_lkid, flags, mode, error); 836 return error; 837 } 838 839 wait_for_completion(&lc->oc_sync_wait); 840 841 status = lksb->sb_status; 842 843 if (status && status != -EAGAIN) { 844 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 845 name, lksb->sb_lkid, flags, mode, status); 846 } 847 848 return status; 849 } 850 851 852 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 853 int flags) 854 { 855 struct ocfs2_live_connection *lc = conn->cc_private; 856 return sync_lock(conn, mode, flags, 857 &lc->oc_version_lksb, VERSION_LOCK); 858 } 859 860 static int version_unlock(struct ocfs2_cluster_connection *conn) 861 { 862 struct ocfs2_live_connection *lc = conn->cc_private; 863 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 864 } 865 866 /* get_protocol_version() 867 * 868 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 869 * The algorithm is: 870 * 1. Attempt to take the lock in EX mode (non-blocking). 871 * 2. If successful (which means it is the first mount), write the 872 * version number and downconvert to PR lock. 873 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 874 * taking the PR lock. 875 */ 876 877 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 878 { 879 int ret; 880 struct ocfs2_live_connection *lc = conn->cc_private; 881 struct ocfs2_protocol_version pv; 882 883 running_proto.pv_major = 884 ocfs2_user_plugin.sp_max_proto.pv_major; 885 running_proto.pv_minor = 886 ocfs2_user_plugin.sp_max_proto.pv_minor; 887 888 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 889 ret = version_lock(conn, DLM_LOCK_EX, 890 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 891 if (!ret) { 892 conn->cc_version.pv_major = running_proto.pv_major; 893 conn->cc_version.pv_minor = running_proto.pv_minor; 894 version_to_lvb(&running_proto, lc->oc_lvb); 895 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 896 } else if (ret == -EAGAIN) { 897 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 898 if (ret) 899 goto out; 900 lvb_to_version(lc->oc_lvb, &pv); 901 902 if ((pv.pv_major != running_proto.pv_major) || 903 (pv.pv_minor > running_proto.pv_minor)) { 904 ret = -EINVAL; 905 goto out; 906 } 907 908 conn->cc_version.pv_major = pv.pv_major; 909 conn->cc_version.pv_minor = pv.pv_minor; 910 } 911 out: 912 return ret; 913 } 914 915 static void user_recover_prep(void *arg) 916 { 917 } 918 919 static void user_recover_slot(void *arg, struct dlm_slot *slot) 920 { 921 struct ocfs2_cluster_connection *conn = arg; 922 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 923 slot->nodeid, slot->slot); 924 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 925 926 } 927 928 static void user_recover_done(void *arg, struct dlm_slot *slots, 929 int num_slots, int our_slot, 930 uint32_t generation) 931 { 932 struct ocfs2_cluster_connection *conn = arg; 933 struct ocfs2_live_connection *lc = conn->cc_private; 934 int i; 935 936 for (i = 0; i < num_slots; i++) 937 if (slots[i].slot == our_slot) { 938 atomic_set(&lc->oc_this_node, slots[i].nodeid); 939 break; 940 } 941 942 lc->oc_our_slot = our_slot; 943 wake_up(&lc->oc_wait); 944 } 945 946 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 947 .recover_prep = user_recover_prep, 948 .recover_slot = user_recover_slot, 949 .recover_done = user_recover_done, 950 }; 951 952 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 953 { 954 version_unlock(conn); 955 dlm_release_lockspace(conn->cc_lockspace, 2); 956 conn->cc_lockspace = NULL; 957 ocfs2_live_connection_drop(conn->cc_private); 958 conn->cc_private = NULL; 959 return 0; 960 } 961 962 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 963 { 964 dlm_lockspace_t *fsdlm; 965 struct ocfs2_live_connection *lc; 966 int rc, ops_rv; 967 968 BUG_ON(conn == NULL); 969 970 lc = kzalloc(sizeof(struct ocfs2_live_connection), GFP_KERNEL); 971 if (!lc) 972 return -ENOMEM; 973 974 init_waitqueue_head(&lc->oc_wait); 975 init_completion(&lc->oc_sync_wait); 976 atomic_set(&lc->oc_this_node, 0); 977 conn->cc_private = lc; 978 lc->oc_type = NO_CONTROLD; 979 980 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 981 DLM_LSFL_NEWEXCL, DLM_LVB_LEN, 982 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 983 if (rc) { 984 if (rc == -EEXIST || rc == -EPROTO) 985 printk(KERN_ERR "ocfs2: Unable to create the " 986 "lockspace %s (%d), because a ocfs2-tools " 987 "program is running on this file system " 988 "with the same name lockspace\n", 989 conn->cc_name, rc); 990 goto out; 991 } 992 993 if (ops_rv == -EOPNOTSUPP) { 994 lc->oc_type = WITH_CONTROLD; 995 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 996 "version of dlm_controld and/or ocfs2-tools." 997 " Please consider upgrading.\n"); 998 } else if (ops_rv) { 999 rc = ops_rv; 1000 goto out; 1001 } 1002 conn->cc_lockspace = fsdlm; 1003 1004 rc = ocfs2_live_connection_attach(conn, lc); 1005 if (rc) 1006 goto out; 1007 1008 if (lc->oc_type == NO_CONTROLD) { 1009 rc = get_protocol_version(conn); 1010 if (rc) { 1011 printk(KERN_ERR "ocfs2: Could not determine" 1012 " locking version\n"); 1013 user_cluster_disconnect(conn); 1014 goto out; 1015 } 1016 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1017 } 1018 1019 /* 1020 * running_proto must have been set before we allowed any mounts 1021 * to proceed. 1022 */ 1023 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1024 printk(KERN_ERR 1025 "Unable to mount with fs locking protocol version " 1026 "%u.%u because negotiated protocol is %u.%u\n", 1027 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1028 running_proto.pv_major, running_proto.pv_minor); 1029 rc = -EPROTO; 1030 ocfs2_live_connection_drop(lc); 1031 lc = NULL; 1032 } 1033 1034 out: 1035 if (rc) 1036 kfree(lc); 1037 return rc; 1038 } 1039 1040 1041 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1042 unsigned int *this_node) 1043 { 1044 int rc; 1045 struct ocfs2_live_connection *lc = conn->cc_private; 1046 1047 if (lc->oc_type == WITH_CONTROLD) 1048 rc = ocfs2_control_get_this_node(); 1049 else if (lc->oc_type == NO_CONTROLD) 1050 rc = atomic_read(&lc->oc_this_node); 1051 else 1052 rc = -EINVAL; 1053 1054 if (rc < 0) 1055 return rc; 1056 1057 *this_node = rc; 1058 return 0; 1059 } 1060 1061 static const struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1062 .connect = user_cluster_connect, 1063 .disconnect = user_cluster_disconnect, 1064 .this_node = user_cluster_this_node, 1065 .dlm_lock = user_dlm_lock, 1066 .dlm_unlock = user_dlm_unlock, 1067 .lock_status = user_dlm_lock_status, 1068 .lvb_valid = user_dlm_lvb_valid, 1069 .lock_lvb = user_dlm_lvb, 1070 .plock = user_plock, 1071 .dump_lksb = user_dlm_dump_lksb, 1072 }; 1073 1074 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1075 .sp_name = "user", 1076 .sp_ops = &ocfs2_user_plugin_ops, 1077 .sp_owner = THIS_MODULE, 1078 }; 1079 1080 1081 static int __init ocfs2_user_plugin_init(void) 1082 { 1083 int rc; 1084 1085 rc = ocfs2_control_init(); 1086 if (!rc) { 1087 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1088 if (rc) 1089 ocfs2_control_exit(); 1090 } 1091 1092 return rc; 1093 } 1094 1095 static void __exit ocfs2_user_plugin_exit(void) 1096 { 1097 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1098 ocfs2_control_exit(); 1099 } 1100 1101 MODULE_AUTHOR("Oracle"); 1102 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1103 MODULE_LICENSE("GPL"); 1104 module_init(ocfs2_user_plugin_init); 1105 module_exit(ocfs2_user_plugin_exit); 1106