1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * stack_user.c 4 * 5 * Code which interfaces ocfs2 with fs/dlm and a userspace stack. 6 * 7 * Copyright (C) 2007 Oracle. All rights reserved. 8 */ 9 10 #include <linux/module.h> 11 #include <linux/fs.h> 12 #include <linux/filelock.h> 13 #include <linux/miscdevice.h> 14 #include <linux/mutex.h> 15 #include <linux/slab.h> 16 #include <linux/reboot.h> 17 #include <linux/sched.h> 18 #include <linux/uaccess.h> 19 20 #include "stackglue.h" 21 22 #include <linux/dlm_plock.h> 23 24 /* 25 * The control protocol starts with a handshake. Until the handshake 26 * is complete, the control device will fail all write(2)s. 27 * 28 * The handshake is simple. First, the client reads until EOF. Each line 29 * of output is a supported protocol tag. All protocol tags are a single 30 * character followed by a two hex digit version number. Currently the 31 * only things supported is T01, for "Text-base version 0x01". Next, the 32 * client writes the version they would like to use, including the newline. 33 * Thus, the protocol tag is 'T01\n'. If the version tag written is 34 * unknown, -EINVAL is returned. Once the negotiation is complete, the 35 * client can start sending messages. 36 * 37 * The T01 protocol has three messages. First is the "SETN" message. 38 * It has the following syntax: 39 * 40 * SETN<space><8-char-hex-nodenum><newline> 41 * 42 * This is 14 characters. 43 * 44 * The "SETN" message must be the first message following the protocol. 45 * It tells ocfs2_control the local node number. 46 * 47 * Next comes the "SETV" message. It has the following syntax: 48 * 49 * SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> 50 * 51 * This is 11 characters. 52 * 53 * The "SETV" message sets the filesystem locking protocol version as 54 * negotiated by the client. The client negotiates based on the maximum 55 * version advertised in /sys/fs/ocfs2/max_locking_protocol. The major 56 * number from the "SETV" message must match 57 * ocfs2_user_plugin.sp_max_proto.pv_major, and the minor number 58 * must be less than or equal to ...sp_max_version.pv_minor. 59 * 60 * Once this information has been set, mounts will be allowed. From this 61 * point on, the "DOWN" message can be sent for node down notification. 62 * It has the following syntax: 63 * 64 * DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> 65 * 66 * eg: 67 * 68 * DOWN 632A924FDD844190BDA93C0DF6B94899 00000001\n 69 * 70 * This is 47 characters. 71 */ 72 73 /* 74 * Whether or not the client has done the handshake. 75 * For now, we have just one protocol version. 76 */ 77 #define OCFS2_CONTROL_PROTO "T01\n" 78 #define OCFS2_CONTROL_PROTO_LEN 4 79 80 /* Handshake states */ 81 #define OCFS2_CONTROL_HANDSHAKE_INVALID (0) 82 #define OCFS2_CONTROL_HANDSHAKE_READ (1) 83 #define OCFS2_CONTROL_HANDSHAKE_PROTOCOL (2) 84 #define OCFS2_CONTROL_HANDSHAKE_VALID (3) 85 86 /* Messages */ 87 #define OCFS2_CONTROL_MESSAGE_OP_LEN 4 88 #define OCFS2_CONTROL_MESSAGE_SETNODE_OP "SETN" 89 #define OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN 14 90 #define OCFS2_CONTROL_MESSAGE_SETVERSION_OP "SETV" 91 #define OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN 11 92 #define OCFS2_CONTROL_MESSAGE_DOWN_OP "DOWN" 93 #define OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN 47 94 #define OCFS2_TEXT_UUID_LEN 32 95 #define OCFS2_CONTROL_MESSAGE_VERNUM_LEN 2 96 #define OCFS2_CONTROL_MESSAGE_NODENUM_LEN 8 97 #define VERSION_LOCK "version_lock" 98 99 enum ocfs2_connection_type { 100 WITH_CONTROLD, 101 NO_CONTROLD 102 }; 103 104 /* 105 * ocfs2_live_connection is refcounted because the filesystem and 106 * miscdevice sides can detach in different order. Let's just be safe. 107 */ 108 struct ocfs2_live_connection { 109 struct list_head oc_list; 110 struct ocfs2_cluster_connection *oc_conn; 111 enum ocfs2_connection_type oc_type; 112 atomic_t oc_this_node; 113 int oc_our_slot; 114 struct dlm_lksb oc_version_lksb; 115 char oc_lvb[DLM_LVB_LEN]; 116 struct completion oc_sync_wait; 117 wait_queue_head_t oc_wait; 118 }; 119 120 struct ocfs2_control_private { 121 struct list_head op_list; 122 int op_state; 123 int op_this_node; 124 struct ocfs2_protocol_version op_proto; 125 }; 126 127 /* SETN<space><8-char-hex-nodenum><newline> */ 128 struct ocfs2_control_message_setn { 129 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 130 char space; 131 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 132 char newline; 133 }; 134 135 /* SETV<space><2-char-hex-major><space><2-char-hex-minor><newline> */ 136 struct ocfs2_control_message_setv { 137 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 138 char space1; 139 char major[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 140 char space2; 141 char minor[OCFS2_CONTROL_MESSAGE_VERNUM_LEN]; 142 char newline; 143 }; 144 145 /* DOWN<space><32-char-cap-hex-uuid><space><8-char-hex-nodenum><newline> */ 146 struct ocfs2_control_message_down { 147 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 148 char space1; 149 char uuid[OCFS2_TEXT_UUID_LEN]; 150 char space2; 151 char nodestr[OCFS2_CONTROL_MESSAGE_NODENUM_LEN]; 152 char newline; 153 }; 154 155 union ocfs2_control_message { 156 char tag[OCFS2_CONTROL_MESSAGE_OP_LEN]; 157 struct ocfs2_control_message_setn u_setn; 158 struct ocfs2_control_message_setv u_setv; 159 struct ocfs2_control_message_down u_down; 160 }; 161 162 static struct ocfs2_stack_plugin ocfs2_user_plugin; 163 164 static atomic_t ocfs2_control_opened; 165 static int ocfs2_control_this_node = -1; 166 static struct ocfs2_protocol_version running_proto; 167 168 static LIST_HEAD(ocfs2_live_connection_list); 169 static LIST_HEAD(ocfs2_control_private_list); 170 static DEFINE_MUTEX(ocfs2_control_lock); 171 172 static inline void ocfs2_control_set_handshake_state(struct file *file, 173 int state) 174 { 175 struct ocfs2_control_private *p = file->private_data; 176 p->op_state = state; 177 } 178 179 static inline int ocfs2_control_get_handshake_state(struct file *file) 180 { 181 struct ocfs2_control_private *p = file->private_data; 182 return p->op_state; 183 } 184 185 static struct ocfs2_live_connection *ocfs2_connection_find(const char *name) 186 { 187 size_t len = strlen(name); 188 struct ocfs2_live_connection *c; 189 190 BUG_ON(!mutex_is_locked(&ocfs2_control_lock)); 191 192 list_for_each_entry(c, &ocfs2_live_connection_list, oc_list) { 193 if ((c->oc_conn->cc_namelen == len) && 194 !strncmp(c->oc_conn->cc_name, name, len)) 195 return c; 196 } 197 198 return NULL; 199 } 200 201 /* 202 * ocfs2_live_connection structures are created underneath the ocfs2 203 * mount path. Since the VFS prevents multiple calls to 204 * fill_super(), we can't get dupes here. 205 */ 206 static int ocfs2_live_connection_attach(struct ocfs2_cluster_connection *conn, 207 struct ocfs2_live_connection *c) 208 { 209 int rc = 0; 210 211 mutex_lock(&ocfs2_control_lock); 212 c->oc_conn = conn; 213 214 if ((c->oc_type == NO_CONTROLD) || atomic_read(&ocfs2_control_opened)) 215 list_add(&c->oc_list, &ocfs2_live_connection_list); 216 else { 217 printk(KERN_ERR 218 "ocfs2: Userspace control daemon is not present\n"); 219 rc = -ESRCH; 220 } 221 222 mutex_unlock(&ocfs2_control_lock); 223 return rc; 224 } 225 226 /* 227 * This function disconnects the cluster connection from ocfs2_control. 228 * Afterwards, userspace can't affect the cluster connection. 229 */ 230 static void ocfs2_live_connection_drop(struct ocfs2_live_connection *c) 231 { 232 mutex_lock(&ocfs2_control_lock); 233 list_del_init(&c->oc_list); 234 c->oc_conn = NULL; 235 mutex_unlock(&ocfs2_control_lock); 236 237 kfree(c); 238 } 239 240 static int ocfs2_control_cfu(void *target, size_t target_len, 241 const char __user *buf, size_t count) 242 { 243 /* The T01 expects write(2) calls to have exactly one command */ 244 if ((count != target_len) || 245 (count > sizeof(union ocfs2_control_message))) 246 return -EINVAL; 247 248 if (copy_from_user(target, buf, target_len)) 249 return -EFAULT; 250 251 return 0; 252 } 253 254 static ssize_t ocfs2_control_validate_protocol(struct file *file, 255 const char __user *buf, 256 size_t count) 257 { 258 ssize_t ret; 259 char kbuf[OCFS2_CONTROL_PROTO_LEN]; 260 261 ret = ocfs2_control_cfu(kbuf, OCFS2_CONTROL_PROTO_LEN, 262 buf, count); 263 if (ret) 264 return ret; 265 266 if (strncmp(kbuf, OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN)) 267 return -EINVAL; 268 269 ocfs2_control_set_handshake_state(file, 270 OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 271 272 return count; 273 } 274 275 static void ocfs2_control_send_down(const char *uuid, 276 int nodenum) 277 { 278 struct ocfs2_live_connection *c; 279 280 mutex_lock(&ocfs2_control_lock); 281 282 c = ocfs2_connection_find(uuid); 283 if (c) { 284 BUG_ON(c->oc_conn == NULL); 285 c->oc_conn->cc_recovery_handler(nodenum, 286 c->oc_conn->cc_recovery_data); 287 } 288 289 mutex_unlock(&ocfs2_control_lock); 290 } 291 292 /* 293 * Called whenever configuration elements are sent to /dev/ocfs2_control. 294 * If all configuration elements are present, try to set the global 295 * values. If there is a problem, return an error. Skip any missing 296 * elements, and only bump ocfs2_control_opened when we have all elements 297 * and are successful. 298 */ 299 static int ocfs2_control_install_private(struct file *file) 300 { 301 int rc = 0; 302 int set_p = 1; 303 struct ocfs2_control_private *p = file->private_data; 304 305 BUG_ON(p->op_state != OCFS2_CONTROL_HANDSHAKE_PROTOCOL); 306 307 mutex_lock(&ocfs2_control_lock); 308 309 if (p->op_this_node < 0) { 310 set_p = 0; 311 } else if ((ocfs2_control_this_node >= 0) && 312 (ocfs2_control_this_node != p->op_this_node)) { 313 rc = -EINVAL; 314 goto out_unlock; 315 } 316 317 if (!p->op_proto.pv_major) { 318 set_p = 0; 319 } else if (!list_empty(&ocfs2_live_connection_list) && 320 ((running_proto.pv_major != p->op_proto.pv_major) || 321 (running_proto.pv_minor != p->op_proto.pv_minor))) { 322 rc = -EINVAL; 323 goto out_unlock; 324 } 325 326 if (set_p) { 327 ocfs2_control_this_node = p->op_this_node; 328 running_proto.pv_major = p->op_proto.pv_major; 329 running_proto.pv_minor = p->op_proto.pv_minor; 330 atomic_inc(&ocfs2_control_opened); 331 ocfs2_control_set_handshake_state(file, 332 OCFS2_CONTROL_HANDSHAKE_VALID); 333 } 334 335 out_unlock: 336 mutex_unlock(&ocfs2_control_lock); 337 338 return rc; 339 } 340 341 static int ocfs2_control_get_this_node(void) 342 { 343 int rc; 344 345 mutex_lock(&ocfs2_control_lock); 346 if (ocfs2_control_this_node < 0) 347 rc = -EINVAL; 348 else 349 rc = ocfs2_control_this_node; 350 mutex_unlock(&ocfs2_control_lock); 351 352 return rc; 353 } 354 355 static int ocfs2_control_do_setnode_msg(struct file *file, 356 struct ocfs2_control_message_setn *msg) 357 { 358 long nodenum; 359 struct ocfs2_control_private *p = file->private_data; 360 361 if (ocfs2_control_get_handshake_state(file) != 362 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 363 return -EINVAL; 364 365 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 366 OCFS2_CONTROL_MESSAGE_OP_LEN)) 367 return -EINVAL; 368 369 if ((msg->space != ' ') || (msg->newline != '\n')) 370 return -EINVAL; 371 msg->space = msg->newline = '\0'; 372 373 if (kstrtol(msg->nodestr, 16, &nodenum)) 374 return -EINVAL; 375 376 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 377 (nodenum > INT_MAX) || (nodenum < 0)) 378 return -ERANGE; 379 p->op_this_node = nodenum; 380 381 return ocfs2_control_install_private(file); 382 } 383 384 static int ocfs2_control_do_setversion_msg(struct file *file, 385 struct ocfs2_control_message_setv *msg) 386 { 387 long major, minor; 388 struct ocfs2_control_private *p = file->private_data; 389 struct ocfs2_protocol_version *max = 390 &ocfs2_user_plugin.sp_max_proto; 391 392 if (ocfs2_control_get_handshake_state(file) != 393 OCFS2_CONTROL_HANDSHAKE_PROTOCOL) 394 return -EINVAL; 395 396 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 397 OCFS2_CONTROL_MESSAGE_OP_LEN)) 398 return -EINVAL; 399 400 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 401 (msg->newline != '\n')) 402 return -EINVAL; 403 msg->space1 = msg->space2 = msg->newline = '\0'; 404 405 if (kstrtol(msg->major, 16, &major)) 406 return -EINVAL; 407 if (kstrtol(msg->minor, 16, &minor)) 408 return -EINVAL; 409 410 /* 411 * The major must be between 1 and 255, inclusive. The minor 412 * must be between 0 and 255, inclusive. The version passed in 413 * must be within the maximum version supported by the filesystem. 414 */ 415 if ((major == LONG_MIN) || (major == LONG_MAX) || 416 (major > (u8)-1) || (major < 1)) 417 return -ERANGE; 418 if ((minor == LONG_MIN) || (minor == LONG_MAX) || 419 (minor > (u8)-1) || (minor < 0)) 420 return -ERANGE; 421 if ((major != max->pv_major) || 422 (minor > max->pv_minor)) 423 return -EINVAL; 424 425 p->op_proto.pv_major = major; 426 p->op_proto.pv_minor = minor; 427 428 return ocfs2_control_install_private(file); 429 } 430 431 static int ocfs2_control_do_down_msg(struct file *file, 432 struct ocfs2_control_message_down *msg) 433 { 434 long nodenum; 435 436 if (ocfs2_control_get_handshake_state(file) != 437 OCFS2_CONTROL_HANDSHAKE_VALID) 438 return -EINVAL; 439 440 if (strncmp(msg->tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 441 OCFS2_CONTROL_MESSAGE_OP_LEN)) 442 return -EINVAL; 443 444 if ((msg->space1 != ' ') || (msg->space2 != ' ') || 445 (msg->newline != '\n')) 446 return -EINVAL; 447 msg->space1 = msg->space2 = msg->newline = '\0'; 448 449 if (kstrtol(msg->nodestr, 16, &nodenum)) 450 return -EINVAL; 451 452 if ((nodenum == LONG_MIN) || (nodenum == LONG_MAX) || 453 (nodenum > INT_MAX) || (nodenum < 0)) 454 return -ERANGE; 455 456 ocfs2_control_send_down(msg->uuid, nodenum); 457 458 return 0; 459 } 460 461 static ssize_t ocfs2_control_message(struct file *file, 462 const char __user *buf, 463 size_t count) 464 { 465 ssize_t ret; 466 union ocfs2_control_message msg; 467 468 /* Try to catch padding issues */ 469 WARN_ON(offsetof(struct ocfs2_control_message_down, uuid) != 470 (sizeof(msg.u_down.tag) + sizeof(msg.u_down.space1))); 471 472 memset(&msg, 0, sizeof(union ocfs2_control_message)); 473 ret = ocfs2_control_cfu(&msg, count, buf, count); 474 if (ret) 475 goto out; 476 477 if ((count == OCFS2_CONTROL_MESSAGE_SETNODE_TOTAL_LEN) && 478 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETNODE_OP, 479 OCFS2_CONTROL_MESSAGE_OP_LEN)) 480 ret = ocfs2_control_do_setnode_msg(file, &msg.u_setn); 481 else if ((count == OCFS2_CONTROL_MESSAGE_SETVERSION_TOTAL_LEN) && 482 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_SETVERSION_OP, 483 OCFS2_CONTROL_MESSAGE_OP_LEN)) 484 ret = ocfs2_control_do_setversion_msg(file, &msg.u_setv); 485 else if ((count == OCFS2_CONTROL_MESSAGE_DOWN_TOTAL_LEN) && 486 !strncmp(msg.tag, OCFS2_CONTROL_MESSAGE_DOWN_OP, 487 OCFS2_CONTROL_MESSAGE_OP_LEN)) 488 ret = ocfs2_control_do_down_msg(file, &msg.u_down); 489 else 490 ret = -EINVAL; 491 492 out: 493 return ret ? ret : count; 494 } 495 496 static ssize_t ocfs2_control_write(struct file *file, 497 const char __user *buf, 498 size_t count, 499 loff_t *ppos) 500 { 501 ssize_t ret; 502 503 switch (ocfs2_control_get_handshake_state(file)) { 504 case OCFS2_CONTROL_HANDSHAKE_INVALID: 505 ret = -EINVAL; 506 break; 507 508 case OCFS2_CONTROL_HANDSHAKE_READ: 509 ret = ocfs2_control_validate_protocol(file, buf, 510 count); 511 break; 512 513 case OCFS2_CONTROL_HANDSHAKE_PROTOCOL: 514 case OCFS2_CONTROL_HANDSHAKE_VALID: 515 ret = ocfs2_control_message(file, buf, count); 516 break; 517 518 default: 519 BUG(); 520 ret = -EIO; 521 break; 522 } 523 524 return ret; 525 } 526 527 /* 528 * This is a naive version. If we ever have a new protocol, we'll expand 529 * it. Probably using seq_file. 530 */ 531 static ssize_t ocfs2_control_read(struct file *file, 532 char __user *buf, 533 size_t count, 534 loff_t *ppos) 535 { 536 ssize_t ret; 537 538 ret = simple_read_from_buffer(buf, count, ppos, 539 OCFS2_CONTROL_PROTO, OCFS2_CONTROL_PROTO_LEN); 540 541 /* Have we read the whole protocol list? */ 542 if (ret > 0 && *ppos >= OCFS2_CONTROL_PROTO_LEN) 543 ocfs2_control_set_handshake_state(file, 544 OCFS2_CONTROL_HANDSHAKE_READ); 545 546 return ret; 547 } 548 549 static int ocfs2_control_release(struct inode *inode, struct file *file) 550 { 551 struct ocfs2_control_private *p = file->private_data; 552 553 mutex_lock(&ocfs2_control_lock); 554 555 if (ocfs2_control_get_handshake_state(file) != 556 OCFS2_CONTROL_HANDSHAKE_VALID) 557 goto out; 558 559 if (atomic_dec_and_test(&ocfs2_control_opened)) { 560 if (!list_empty(&ocfs2_live_connection_list)) { 561 /* XXX: Do bad things! */ 562 printk(KERN_ERR 563 "ocfs2: Unexpected release of ocfs2_control!\n" 564 " Loss of cluster connection requires " 565 "an emergency restart!\n"); 566 emergency_restart(); 567 } 568 /* 569 * Last valid close clears the node number and resets 570 * the locking protocol version 571 */ 572 ocfs2_control_this_node = -1; 573 running_proto.pv_major = 0; 574 running_proto.pv_minor = 0; 575 } 576 577 out: 578 list_del_init(&p->op_list); 579 file->private_data = NULL; 580 581 mutex_unlock(&ocfs2_control_lock); 582 583 kfree(p); 584 585 return 0; 586 } 587 588 static int ocfs2_control_open(struct inode *inode, struct file *file) 589 { 590 struct ocfs2_control_private *p; 591 592 p = kzalloc_obj(struct ocfs2_control_private); 593 if (!p) 594 return -ENOMEM; 595 p->op_this_node = -1; 596 597 mutex_lock(&ocfs2_control_lock); 598 file->private_data = p; 599 list_add(&p->op_list, &ocfs2_control_private_list); 600 mutex_unlock(&ocfs2_control_lock); 601 602 return 0; 603 } 604 605 static const struct file_operations ocfs2_control_fops = { 606 .open = ocfs2_control_open, 607 .release = ocfs2_control_release, 608 .read = ocfs2_control_read, 609 .write = ocfs2_control_write, 610 .owner = THIS_MODULE, 611 .llseek = default_llseek, 612 }; 613 614 static struct miscdevice ocfs2_control_device = { 615 .minor = MISC_DYNAMIC_MINOR, 616 .name = "ocfs2_control", 617 .fops = &ocfs2_control_fops, 618 }; 619 620 static int ocfs2_control_init(void) 621 { 622 int rc; 623 624 atomic_set(&ocfs2_control_opened, 0); 625 626 rc = misc_register(&ocfs2_control_device); 627 if (rc) 628 printk(KERN_ERR 629 "ocfs2: Unable to register ocfs2_control device " 630 "(errno %d)\n", 631 -rc); 632 633 return rc; 634 } 635 636 static void ocfs2_control_exit(void) 637 { 638 misc_deregister(&ocfs2_control_device); 639 } 640 641 static void fsdlm_lock_ast_wrapper(void *astarg) 642 { 643 struct ocfs2_dlm_lksb *lksb = astarg; 644 int status = lksb->lksb_fsdlm.sb_status; 645 646 /* 647 * For now we're punting on the issue of other non-standard errors 648 * where we can't tell if the unlock_ast or lock_ast should be called. 649 * The main "other error" that's possible is EINVAL which means the 650 * function was called with invalid args, which shouldn't be possible 651 * since the caller here is under our control. Other non-standard 652 * errors probably fall into the same category, or otherwise are fatal 653 * which means we can't carry on anyway. 654 */ 655 656 if (status == -DLM_EUNLOCK || status == -DLM_ECANCEL) 657 lksb->lksb_conn->cc_proto->lp_unlock_ast(lksb, 0); 658 else 659 lksb->lksb_conn->cc_proto->lp_lock_ast(lksb); 660 } 661 662 static void fsdlm_blocking_ast_wrapper(void *astarg, int level) 663 { 664 struct ocfs2_dlm_lksb *lksb = astarg; 665 666 lksb->lksb_conn->cc_proto->lp_blocking_ast(lksb, level); 667 } 668 669 static int user_dlm_lock(struct ocfs2_cluster_connection *conn, 670 int mode, 671 struct ocfs2_dlm_lksb *lksb, 672 u32 flags, 673 void *name, 674 unsigned int namelen) 675 { 676 if (!lksb->lksb_fsdlm.sb_lvbptr) 677 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 678 sizeof(struct dlm_lksb); 679 680 return dlm_lock(conn->cc_lockspace, mode, &lksb->lksb_fsdlm, 681 flags|DLM_LKF_NODLCKWT, name, namelen, 0, 682 fsdlm_lock_ast_wrapper, lksb, 683 fsdlm_blocking_ast_wrapper); 684 } 685 686 static int user_dlm_unlock(struct ocfs2_cluster_connection *conn, 687 struct ocfs2_dlm_lksb *lksb, 688 u32 flags) 689 { 690 return dlm_unlock(conn->cc_lockspace, lksb->lksb_fsdlm.sb_lkid, 691 flags, &lksb->lksb_fsdlm, lksb); 692 } 693 694 static int user_dlm_lock_status(struct ocfs2_dlm_lksb *lksb) 695 { 696 return lksb->lksb_fsdlm.sb_status; 697 } 698 699 static int user_dlm_lvb_valid(struct ocfs2_dlm_lksb *lksb) 700 { 701 int invalid = lksb->lksb_fsdlm.sb_flags & DLM_SBF_VALNOTVALID; 702 703 return !invalid; 704 } 705 706 static void *user_dlm_lvb(struct ocfs2_dlm_lksb *lksb) 707 { 708 if (!lksb->lksb_fsdlm.sb_lvbptr) 709 lksb->lksb_fsdlm.sb_lvbptr = (char *)lksb + 710 sizeof(struct dlm_lksb); 711 return (void *)(lksb->lksb_fsdlm.sb_lvbptr); 712 } 713 714 static void user_dlm_dump_lksb(struct ocfs2_dlm_lksb *lksb) 715 { 716 } 717 718 static int user_plock(struct ocfs2_cluster_connection *conn, 719 u64 ino, 720 struct file *file, 721 int cmd, 722 struct file_lock *fl) 723 { 724 /* 725 * This more or less just demuxes the plock request into any 726 * one of three dlm calls. 727 * 728 * Internally, fs/dlm will pass these to a misc device, which 729 * a userspace daemon will read and write to. 730 */ 731 732 if (cmd == F_CANCELLK) 733 return dlm_posix_cancel(conn->cc_lockspace, ino, file, fl); 734 else if (IS_GETLK(cmd)) 735 return dlm_posix_get(conn->cc_lockspace, ino, file, fl); 736 else if (lock_is_unlock(fl)) 737 return dlm_posix_unlock(conn->cc_lockspace, ino, file, fl); 738 else 739 return dlm_posix_lock(conn->cc_lockspace, ino, file, cmd, fl); 740 } 741 742 /* 743 * Compare a requested locking protocol version against the current one. 744 * 745 * If the major numbers are different, they are incompatible. 746 * If the current minor is greater than the request, they are incompatible. 747 * If the current minor is less than or equal to the request, they are 748 * compatible, and the requester should run at the current minor version. 749 */ 750 static int fs_protocol_compare(struct ocfs2_protocol_version *existing, 751 struct ocfs2_protocol_version *request) 752 { 753 if (existing->pv_major != request->pv_major) 754 return 1; 755 756 if (existing->pv_minor > request->pv_minor) 757 return 1; 758 759 if (existing->pv_minor < request->pv_minor) 760 request->pv_minor = existing->pv_minor; 761 762 return 0; 763 } 764 765 static void lvb_to_version(char *lvb, struct ocfs2_protocol_version *ver) 766 { 767 struct ocfs2_protocol_version *pv = 768 (struct ocfs2_protocol_version *)lvb; 769 /* 770 * ocfs2_protocol_version has two u8 variables, so we don't 771 * need any endian conversion. 772 */ 773 ver->pv_major = pv->pv_major; 774 ver->pv_minor = pv->pv_minor; 775 } 776 777 static void version_to_lvb(struct ocfs2_protocol_version *ver, char *lvb) 778 { 779 struct ocfs2_protocol_version *pv = 780 (struct ocfs2_protocol_version *)lvb; 781 /* 782 * ocfs2_protocol_version has two u8 variables, so we don't 783 * need any endian conversion. 784 */ 785 pv->pv_major = ver->pv_major; 786 pv->pv_minor = ver->pv_minor; 787 } 788 789 static void sync_wait_cb(void *arg) 790 { 791 struct ocfs2_cluster_connection *conn = arg; 792 struct ocfs2_live_connection *lc = conn->cc_private; 793 complete(&lc->oc_sync_wait); 794 } 795 796 static int sync_unlock(struct ocfs2_cluster_connection *conn, 797 struct dlm_lksb *lksb, char *name) 798 { 799 int error; 800 struct ocfs2_live_connection *lc = conn->cc_private; 801 802 error = dlm_unlock(conn->cc_lockspace, lksb->sb_lkid, 0, lksb, conn); 803 if (error) { 804 printk(KERN_ERR "%s lkid %x error %d\n", 805 name, lksb->sb_lkid, error); 806 return error; 807 } 808 809 wait_for_completion(&lc->oc_sync_wait); 810 811 if (lksb->sb_status != -DLM_EUNLOCK) { 812 printk(KERN_ERR "%s lkid %x status %d\n", 813 name, lksb->sb_lkid, lksb->sb_status); 814 return -1; 815 } 816 return 0; 817 } 818 819 static int sync_lock(struct ocfs2_cluster_connection *conn, 820 int mode, uint32_t flags, 821 struct dlm_lksb *lksb, char *name) 822 { 823 int error, status; 824 struct ocfs2_live_connection *lc = conn->cc_private; 825 826 error = dlm_lock(conn->cc_lockspace, mode, lksb, flags, 827 name, strlen(name), 828 0, sync_wait_cb, conn, NULL); 829 if (error) { 830 printk(KERN_ERR "%s lkid %x flags %x mode %d error %d\n", 831 name, lksb->sb_lkid, flags, mode, error); 832 return error; 833 } 834 835 wait_for_completion(&lc->oc_sync_wait); 836 837 status = lksb->sb_status; 838 839 if (status && status != -EAGAIN) { 840 printk(KERN_ERR "%s lkid %x flags %x mode %d status %d\n", 841 name, lksb->sb_lkid, flags, mode, status); 842 } 843 844 return status; 845 } 846 847 848 static int version_lock(struct ocfs2_cluster_connection *conn, int mode, 849 int flags) 850 { 851 struct ocfs2_live_connection *lc = conn->cc_private; 852 return sync_lock(conn, mode, flags, 853 &lc->oc_version_lksb, VERSION_LOCK); 854 } 855 856 static int version_unlock(struct ocfs2_cluster_connection *conn) 857 { 858 struct ocfs2_live_connection *lc = conn->cc_private; 859 return sync_unlock(conn, &lc->oc_version_lksb, VERSION_LOCK); 860 } 861 862 /* get_protocol_version() 863 * 864 * To exchange ocfs2 versioning, we use the LVB of the version dlm lock. 865 * The algorithm is: 866 * 1. Attempt to take the lock in EX mode (non-blocking). 867 * 2. If successful (which means it is the first mount), write the 868 * version number and downconvert to PR lock. 869 * 3. If unsuccessful (returns -EAGAIN), read the version from the LVB after 870 * taking the PR lock. 871 */ 872 873 static int get_protocol_version(struct ocfs2_cluster_connection *conn) 874 { 875 int ret; 876 struct ocfs2_live_connection *lc = conn->cc_private; 877 struct ocfs2_protocol_version pv; 878 879 running_proto.pv_major = 880 ocfs2_user_plugin.sp_max_proto.pv_major; 881 running_proto.pv_minor = 882 ocfs2_user_plugin.sp_max_proto.pv_minor; 883 884 lc->oc_version_lksb.sb_lvbptr = lc->oc_lvb; 885 ret = version_lock(conn, DLM_LOCK_EX, 886 DLM_LKF_VALBLK|DLM_LKF_NOQUEUE); 887 if (!ret) { 888 conn->cc_version.pv_major = running_proto.pv_major; 889 conn->cc_version.pv_minor = running_proto.pv_minor; 890 version_to_lvb(&running_proto, lc->oc_lvb); 891 version_lock(conn, DLM_LOCK_PR, DLM_LKF_CONVERT|DLM_LKF_VALBLK); 892 } else if (ret == -EAGAIN) { 893 ret = version_lock(conn, DLM_LOCK_PR, DLM_LKF_VALBLK); 894 if (ret) 895 goto out; 896 lvb_to_version(lc->oc_lvb, &pv); 897 898 if ((pv.pv_major != running_proto.pv_major) || 899 (pv.pv_minor > running_proto.pv_minor)) { 900 ret = -EINVAL; 901 goto out; 902 } 903 904 conn->cc_version.pv_major = pv.pv_major; 905 conn->cc_version.pv_minor = pv.pv_minor; 906 } 907 out: 908 return ret; 909 } 910 911 static void user_recover_prep(void *arg) 912 { 913 } 914 915 static void user_recover_slot(void *arg, struct dlm_slot *slot) 916 { 917 struct ocfs2_cluster_connection *conn = arg; 918 printk(KERN_INFO "ocfs2: Node %d/%d down. Initiating recovery.\n", 919 slot->nodeid, slot->slot); 920 conn->cc_recovery_handler(slot->nodeid, conn->cc_recovery_data); 921 922 } 923 924 static void user_recover_done(void *arg, struct dlm_slot *slots, 925 int num_slots, int our_slot, 926 uint32_t generation) 927 { 928 struct ocfs2_cluster_connection *conn = arg; 929 struct ocfs2_live_connection *lc = conn->cc_private; 930 int i; 931 932 for (i = 0; i < num_slots; i++) 933 if (slots[i].slot == our_slot) { 934 atomic_set(&lc->oc_this_node, slots[i].nodeid); 935 break; 936 } 937 938 lc->oc_our_slot = our_slot; 939 wake_up(&lc->oc_wait); 940 } 941 942 static const struct dlm_lockspace_ops ocfs2_ls_ops = { 943 .recover_prep = user_recover_prep, 944 .recover_slot = user_recover_slot, 945 .recover_done = user_recover_done, 946 }; 947 948 static int user_cluster_disconnect(struct ocfs2_cluster_connection *conn) 949 { 950 version_unlock(conn); 951 dlm_release_lockspace(conn->cc_lockspace, DLM_RELEASE_NORMAL); 952 conn->cc_lockspace = NULL; 953 ocfs2_live_connection_drop(conn->cc_private); 954 conn->cc_private = NULL; 955 return 0; 956 } 957 958 static int user_cluster_connect(struct ocfs2_cluster_connection *conn) 959 { 960 dlm_lockspace_t *fsdlm; 961 struct ocfs2_live_connection *lc; 962 int rc, ops_rv; 963 964 BUG_ON(conn == NULL); 965 966 lc = kzalloc_obj(struct ocfs2_live_connection); 967 if (!lc) 968 return -ENOMEM; 969 970 init_waitqueue_head(&lc->oc_wait); 971 init_completion(&lc->oc_sync_wait); 972 atomic_set(&lc->oc_this_node, 0); 973 conn->cc_private = lc; 974 lc->oc_type = NO_CONTROLD; 975 976 rc = dlm_new_lockspace(conn->cc_name, conn->cc_cluster_name, 977 DLM_LSFL_NEWEXCL, DLM_LVB_LEN, 978 &ocfs2_ls_ops, conn, &ops_rv, &fsdlm); 979 if (rc) { 980 if (rc == -EEXIST || rc == -EPROTO) 981 printk(KERN_ERR "ocfs2: Unable to create the " 982 "lockspace %s (%d), because a ocfs2-tools " 983 "program is running on this file system " 984 "with the same name lockspace\n", 985 conn->cc_name, rc); 986 goto out; 987 } 988 989 if (ops_rv == -EOPNOTSUPP) { 990 lc->oc_type = WITH_CONTROLD; 991 printk(KERN_NOTICE "ocfs2: You seem to be using an older " 992 "version of dlm_controld and/or ocfs2-tools." 993 " Please consider upgrading.\n"); 994 } else if (ops_rv) { 995 rc = ops_rv; 996 goto out; 997 } 998 conn->cc_lockspace = fsdlm; 999 1000 rc = ocfs2_live_connection_attach(conn, lc); 1001 if (rc) 1002 goto out; 1003 1004 if (lc->oc_type == NO_CONTROLD) { 1005 rc = get_protocol_version(conn); 1006 if (rc) { 1007 printk(KERN_ERR "ocfs2: Could not determine" 1008 " locking version\n"); 1009 user_cluster_disconnect(conn); 1010 lc = NULL; 1011 goto out; 1012 } 1013 wait_event(lc->oc_wait, (atomic_read(&lc->oc_this_node) > 0)); 1014 } 1015 1016 /* 1017 * running_proto must have been set before we allowed any mounts 1018 * to proceed. 1019 */ 1020 if (fs_protocol_compare(&running_proto, &conn->cc_version)) { 1021 printk(KERN_ERR 1022 "Unable to mount with fs locking protocol version " 1023 "%u.%u because negotiated protocol is %u.%u\n", 1024 conn->cc_version.pv_major, conn->cc_version.pv_minor, 1025 running_proto.pv_major, running_proto.pv_minor); 1026 rc = -EPROTO; 1027 ocfs2_live_connection_drop(lc); 1028 lc = NULL; 1029 } 1030 1031 out: 1032 if (rc) 1033 kfree(lc); 1034 return rc; 1035 } 1036 1037 1038 static int user_cluster_this_node(struct ocfs2_cluster_connection *conn, 1039 unsigned int *this_node) 1040 { 1041 int rc; 1042 struct ocfs2_live_connection *lc = conn->cc_private; 1043 1044 if (lc->oc_type == WITH_CONTROLD) 1045 rc = ocfs2_control_get_this_node(); 1046 else if (lc->oc_type == NO_CONTROLD) 1047 rc = atomic_read(&lc->oc_this_node); 1048 else 1049 rc = -EINVAL; 1050 1051 if (rc < 0) 1052 return rc; 1053 1054 *this_node = rc; 1055 return 0; 1056 } 1057 1058 static const struct ocfs2_stack_operations ocfs2_user_plugin_ops = { 1059 .connect = user_cluster_connect, 1060 .disconnect = user_cluster_disconnect, 1061 .this_node = user_cluster_this_node, 1062 .dlm_lock = user_dlm_lock, 1063 .dlm_unlock = user_dlm_unlock, 1064 .lock_status = user_dlm_lock_status, 1065 .lvb_valid = user_dlm_lvb_valid, 1066 .lock_lvb = user_dlm_lvb, 1067 .plock = user_plock, 1068 .dump_lksb = user_dlm_dump_lksb, 1069 }; 1070 1071 static struct ocfs2_stack_plugin ocfs2_user_plugin = { 1072 .sp_name = "user", 1073 .sp_ops = &ocfs2_user_plugin_ops, 1074 .sp_owner = THIS_MODULE, 1075 }; 1076 1077 1078 static int __init ocfs2_user_plugin_init(void) 1079 { 1080 int rc; 1081 1082 rc = ocfs2_control_init(); 1083 if (!rc) { 1084 rc = ocfs2_stack_glue_register(&ocfs2_user_plugin); 1085 if (rc) 1086 ocfs2_control_exit(); 1087 } 1088 1089 return rc; 1090 } 1091 1092 static void __exit ocfs2_user_plugin_exit(void) 1093 { 1094 ocfs2_stack_glue_unregister(&ocfs2_user_plugin); 1095 ocfs2_control_exit(); 1096 } 1097 1098 MODULE_AUTHOR("Oracle"); 1099 MODULE_DESCRIPTION("ocfs2 driver for userspace cluster stacks"); 1100 MODULE_LICENSE("GPL"); 1101 module_init(ocfs2_user_plugin_init); 1102 module_exit(ocfs2_user_plugin_exit); 1103