1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/dlm.h> 14 #include <linux/sched.h> 15 #include <linux/raid/md_p.h> 16 #include "md.h" 17 #include "bitmap.h" 18 #include "md-cluster.h" 19 20 #define LVB_SIZE 64 21 #define NEW_DEV_TIMEOUT 5000 22 23 struct dlm_lock_resource { 24 dlm_lockspace_t *ls; 25 struct dlm_lksb lksb; 26 char *name; /* lock name. */ 27 uint32_t flags; /* flags to pass to dlm_lock() */ 28 struct completion completion; /* completion for synchronized locking */ 29 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 30 struct mddev *mddev; /* pointing back to mddev. */ 31 }; 32 33 struct suspend_info { 34 int slot; 35 sector_t lo; 36 sector_t hi; 37 struct list_head list; 38 }; 39 40 struct resync_info { 41 __le64 lo; 42 __le64 hi; 43 }; 44 45 /* md_cluster_info flags */ 46 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 47 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 48 49 50 struct md_cluster_info { 51 /* dlm lock space and resources for clustered raid. */ 52 dlm_lockspace_t *lockspace; 53 int slot_number; 54 struct completion completion; 55 struct dlm_lock_resource *sb_lock; 56 struct mutex sb_mutex; 57 struct dlm_lock_resource *bitmap_lockres; 58 struct list_head suspend_list; 59 spinlock_t suspend_lock; 60 struct md_thread *recovery_thread; 61 unsigned long recovery_map; 62 /* communication loc resources */ 63 struct dlm_lock_resource *ack_lockres; 64 struct dlm_lock_resource *message_lockres; 65 struct dlm_lock_resource *token_lockres; 66 struct dlm_lock_resource *no_new_dev_lockres; 67 struct md_thread *recv_thread; 68 struct completion newdisk_completion; 69 unsigned long state; 70 }; 71 72 enum msg_type { 73 METADATA_UPDATED = 0, 74 RESYNCING, 75 NEWDISK, 76 REMOVE, 77 RE_ADD, 78 }; 79 80 struct cluster_msg { 81 int type; 82 int slot; 83 /* TODO: Unionize this for smaller footprint */ 84 sector_t low; 85 sector_t high; 86 char uuid[16]; 87 int raid_slot; 88 }; 89 90 static void sync_ast(void *arg) 91 { 92 struct dlm_lock_resource *res; 93 94 res = (struct dlm_lock_resource *) arg; 95 complete(&res->completion); 96 } 97 98 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 99 { 100 int ret = 0; 101 102 init_completion(&res->completion); 103 ret = dlm_lock(res->ls, mode, &res->lksb, 104 res->flags, res->name, strlen(res->name), 105 0, sync_ast, res, res->bast); 106 if (ret) 107 return ret; 108 wait_for_completion(&res->completion); 109 return res->lksb.sb_status; 110 } 111 112 static int dlm_unlock_sync(struct dlm_lock_resource *res) 113 { 114 return dlm_lock_sync(res, DLM_LOCK_NL); 115 } 116 117 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 118 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 119 { 120 struct dlm_lock_resource *res = NULL; 121 int ret, namelen; 122 struct md_cluster_info *cinfo = mddev->cluster_info; 123 124 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 125 if (!res) 126 return NULL; 127 res->ls = cinfo->lockspace; 128 res->mddev = mddev; 129 namelen = strlen(name); 130 res->name = kzalloc(namelen + 1, GFP_KERNEL); 131 if (!res->name) { 132 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 133 goto out_err; 134 } 135 strlcpy(res->name, name, namelen + 1); 136 if (with_lvb) { 137 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 138 if (!res->lksb.sb_lvbptr) { 139 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 140 goto out_err; 141 } 142 res->flags = DLM_LKF_VALBLK; 143 } 144 145 if (bastfn) 146 res->bast = bastfn; 147 148 res->flags |= DLM_LKF_EXPEDITE; 149 150 ret = dlm_lock_sync(res, DLM_LOCK_NL); 151 if (ret) { 152 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 153 goto out_err; 154 } 155 res->flags &= ~DLM_LKF_EXPEDITE; 156 res->flags |= DLM_LKF_CONVERT; 157 158 return res; 159 out_err: 160 kfree(res->lksb.sb_lvbptr); 161 kfree(res->name); 162 kfree(res); 163 return NULL; 164 } 165 166 static void lockres_free(struct dlm_lock_resource *res) 167 { 168 if (!res) 169 return; 170 171 init_completion(&res->completion); 172 dlm_unlock(res->ls, res->lksb.sb_lkid, 0, &res->lksb, res); 173 wait_for_completion(&res->completion); 174 175 kfree(res->name); 176 kfree(res->lksb.sb_lvbptr); 177 kfree(res); 178 } 179 180 static char *pretty_uuid(char *dest, char *src) 181 { 182 int i, len = 0; 183 184 for (i = 0; i < 16; i++) { 185 if (i == 4 || i == 6 || i == 8 || i == 10) 186 len += sprintf(dest + len, "-"); 187 len += sprintf(dest + len, "%02x", (__u8)src[i]); 188 } 189 return dest; 190 } 191 192 static void add_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres, 193 sector_t lo, sector_t hi) 194 { 195 struct resync_info *ri; 196 197 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 198 ri->lo = cpu_to_le64(lo); 199 ri->hi = cpu_to_le64(hi); 200 } 201 202 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 203 { 204 struct resync_info ri; 205 struct suspend_info *s = NULL; 206 sector_t hi = 0; 207 208 dlm_lock_sync(lockres, DLM_LOCK_CR); 209 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 210 hi = le64_to_cpu(ri.hi); 211 if (ri.hi > 0) { 212 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 213 if (!s) 214 goto out; 215 s->hi = hi; 216 s->lo = le64_to_cpu(ri.lo); 217 } 218 dlm_unlock_sync(lockres); 219 out: 220 return s; 221 } 222 223 static void recover_bitmaps(struct md_thread *thread) 224 { 225 struct mddev *mddev = thread->mddev; 226 struct md_cluster_info *cinfo = mddev->cluster_info; 227 struct dlm_lock_resource *bm_lockres; 228 char str[64]; 229 int slot, ret; 230 struct suspend_info *s, *tmp; 231 sector_t lo, hi; 232 233 while (cinfo->recovery_map) { 234 slot = fls64((u64)cinfo->recovery_map) - 1; 235 236 /* Clear suspend_area associated with the bitmap */ 237 spin_lock_irq(&cinfo->suspend_lock); 238 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 239 if (slot == s->slot) { 240 list_del(&s->list); 241 kfree(s); 242 } 243 spin_unlock_irq(&cinfo->suspend_lock); 244 245 snprintf(str, 64, "bitmap%04d", slot); 246 bm_lockres = lockres_init(mddev, str, NULL, 1); 247 if (!bm_lockres) { 248 pr_err("md-cluster: Cannot initialize bitmaps\n"); 249 goto clear_bit; 250 } 251 252 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 253 if (ret) { 254 pr_err("md-cluster: Could not DLM lock %s: %d\n", 255 str, ret); 256 goto clear_bit; 257 } 258 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 259 if (ret) { 260 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 261 goto dlm_unlock; 262 } 263 if (hi > 0) { 264 /* TODO:Wait for current resync to get over */ 265 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 266 if (lo < mddev->recovery_cp) 267 mddev->recovery_cp = lo; 268 md_check_recovery(mddev); 269 } 270 dlm_unlock: 271 dlm_unlock_sync(bm_lockres); 272 clear_bit: 273 clear_bit(slot, &cinfo->recovery_map); 274 } 275 } 276 277 static void recover_prep(void *arg) 278 { 279 struct mddev *mddev = arg; 280 struct md_cluster_info *cinfo = mddev->cluster_info; 281 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 282 } 283 284 static void recover_slot(void *arg, struct dlm_slot *slot) 285 { 286 struct mddev *mddev = arg; 287 struct md_cluster_info *cinfo = mddev->cluster_info; 288 289 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 290 mddev->bitmap_info.cluster_name, 291 slot->nodeid, slot->slot, 292 cinfo->slot_number); 293 set_bit(slot->slot - 1, &cinfo->recovery_map); 294 if (!cinfo->recovery_thread) { 295 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 296 mddev, "recover"); 297 if (!cinfo->recovery_thread) { 298 pr_warn("md-cluster: Could not create recovery thread\n"); 299 return; 300 } 301 } 302 md_wakeup_thread(cinfo->recovery_thread); 303 } 304 305 static void recover_done(void *arg, struct dlm_slot *slots, 306 int num_slots, int our_slot, 307 uint32_t generation) 308 { 309 struct mddev *mddev = arg; 310 struct md_cluster_info *cinfo = mddev->cluster_info; 311 312 cinfo->slot_number = our_slot; 313 complete(&cinfo->completion); 314 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 315 } 316 317 static const struct dlm_lockspace_ops md_ls_ops = { 318 .recover_prep = recover_prep, 319 .recover_slot = recover_slot, 320 .recover_done = recover_done, 321 }; 322 323 /* 324 * The BAST function for the ack lock resource 325 * This function wakes up the receive thread in 326 * order to receive and process the message. 327 */ 328 static void ack_bast(void *arg, int mode) 329 { 330 struct dlm_lock_resource *res = (struct dlm_lock_resource *)arg; 331 struct md_cluster_info *cinfo = res->mddev->cluster_info; 332 333 if (mode == DLM_LOCK_EX) 334 md_wakeup_thread(cinfo->recv_thread); 335 } 336 337 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 338 { 339 struct suspend_info *s, *tmp; 340 341 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 342 if (slot == s->slot) { 343 pr_info("%s:%d Deleting suspend_info: %d\n", 344 __func__, __LINE__, slot); 345 list_del(&s->list); 346 kfree(s); 347 break; 348 } 349 } 350 351 static void remove_suspend_info(struct md_cluster_info *cinfo, int slot) 352 { 353 spin_lock_irq(&cinfo->suspend_lock); 354 __remove_suspend_info(cinfo, slot); 355 spin_unlock_irq(&cinfo->suspend_lock); 356 } 357 358 359 static void process_suspend_info(struct md_cluster_info *cinfo, 360 int slot, sector_t lo, sector_t hi) 361 { 362 struct suspend_info *s; 363 364 if (!hi) { 365 remove_suspend_info(cinfo, slot); 366 return; 367 } 368 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 369 if (!s) 370 return; 371 s->slot = slot; 372 s->lo = lo; 373 s->hi = hi; 374 spin_lock_irq(&cinfo->suspend_lock); 375 /* Remove existing entry (if exists) before adding */ 376 __remove_suspend_info(cinfo, slot); 377 list_add(&s->list, &cinfo->suspend_list); 378 spin_unlock_irq(&cinfo->suspend_lock); 379 } 380 381 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 382 { 383 char disk_uuid[64]; 384 struct md_cluster_info *cinfo = mddev->cluster_info; 385 char event_name[] = "EVENT=ADD_DEVICE"; 386 char raid_slot[16]; 387 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 388 int len; 389 390 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 391 pretty_uuid(disk_uuid + len, cmsg->uuid); 392 snprintf(raid_slot, 16, "RAID_DISK=%d", cmsg->raid_slot); 393 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 394 init_completion(&cinfo->newdisk_completion); 395 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 396 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 397 wait_for_completion_timeout(&cinfo->newdisk_completion, 398 NEW_DEV_TIMEOUT); 399 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 400 } 401 402 403 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 404 { 405 struct md_cluster_info *cinfo = mddev->cluster_info; 406 407 md_reload_sb(mddev); 408 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 409 } 410 411 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 412 { 413 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot); 414 415 if (rdev) 416 md_kick_rdev_from_array(rdev); 417 else 418 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", __func__, __LINE__, msg->raid_slot); 419 } 420 421 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 422 { 423 struct md_rdev *rdev = md_find_rdev_nr_rcu(mddev, msg->raid_slot); 424 425 if (rdev && test_bit(Faulty, &rdev->flags)) 426 clear_bit(Faulty, &rdev->flags); 427 else 428 pr_warn("%s: %d Could not find disk(%d) which is faulty", __func__, __LINE__, msg->raid_slot); 429 } 430 431 static void process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 432 { 433 switch (msg->type) { 434 case METADATA_UPDATED: 435 pr_info("%s: %d Received message: METADATA_UPDATE from %d\n", 436 __func__, __LINE__, msg->slot); 437 process_metadata_update(mddev, msg); 438 break; 439 case RESYNCING: 440 pr_info("%s: %d Received message: RESYNCING from %d\n", 441 __func__, __LINE__, msg->slot); 442 process_suspend_info(mddev->cluster_info, msg->slot, 443 msg->low, msg->high); 444 break; 445 case NEWDISK: 446 pr_info("%s: %d Received message: NEWDISK from %d\n", 447 __func__, __LINE__, msg->slot); 448 process_add_new_disk(mddev, msg); 449 break; 450 case REMOVE: 451 pr_info("%s: %d Received REMOVE from %d\n", 452 __func__, __LINE__, msg->slot); 453 process_remove_disk(mddev, msg); 454 break; 455 case RE_ADD: 456 pr_info("%s: %d Received RE_ADD from %d\n", 457 __func__, __LINE__, msg->slot); 458 process_readd_disk(mddev, msg); 459 break; 460 default: 461 pr_warn("%s:%d Received unknown message from %d\n", 462 __func__, __LINE__, msg->slot); 463 } 464 } 465 466 /* 467 * thread for receiving message 468 */ 469 static void recv_daemon(struct md_thread *thread) 470 { 471 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 472 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 473 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 474 struct cluster_msg msg; 475 476 /*get CR on Message*/ 477 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 478 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 479 return; 480 } 481 482 /* read lvb and wake up thread to process this message_lockres */ 483 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 484 process_recvd_msg(thread->mddev, &msg); 485 486 /*release CR on ack_lockres*/ 487 dlm_unlock_sync(ack_lockres); 488 /*up-convert to EX on message_lockres*/ 489 dlm_lock_sync(message_lockres, DLM_LOCK_EX); 490 /*get CR on ack_lockres again*/ 491 dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 492 /*release CR on message_lockres*/ 493 dlm_unlock_sync(message_lockres); 494 } 495 496 /* lock_comm() 497 * Takes the lock on the TOKEN lock resource so no other 498 * node can communicate while the operation is underway. 499 */ 500 static int lock_comm(struct md_cluster_info *cinfo) 501 { 502 int error; 503 504 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 505 if (error) 506 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 507 __func__, __LINE__, error); 508 return error; 509 } 510 511 static void unlock_comm(struct md_cluster_info *cinfo) 512 { 513 dlm_unlock_sync(cinfo->token_lockres); 514 } 515 516 /* __sendmsg() 517 * This function performs the actual sending of the message. This function is 518 * usually called after performing the encompassing operation 519 * The function: 520 * 1. Grabs the message lockresource in EX mode 521 * 2. Copies the message to the message LVB 522 * 3. Downconverts message lockresource to CR 523 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 524 * and the other nodes read the message. The thread will wait here until all other 525 * nodes have released ack lock resource. 526 * 5. Downconvert ack lockresource to CR 527 */ 528 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 529 { 530 int error; 531 int slot = cinfo->slot_number - 1; 532 533 cmsg->slot = cpu_to_le32(slot); 534 /*get EX on Message*/ 535 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 536 if (error) { 537 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 538 goto failed_message; 539 } 540 541 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 542 sizeof(struct cluster_msg)); 543 /*down-convert EX to CR on Message*/ 544 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CR); 545 if (error) { 546 pr_err("md-cluster: failed to convert EX to CR on MESSAGE(%d)\n", 547 error); 548 goto failed_message; 549 } 550 551 /*up-convert CR to EX on Ack*/ 552 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 553 if (error) { 554 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 555 error); 556 goto failed_ack; 557 } 558 559 /*down-convert EX to CR on Ack*/ 560 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 561 if (error) { 562 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 563 error); 564 goto failed_ack; 565 } 566 567 failed_ack: 568 dlm_unlock_sync(cinfo->message_lockres); 569 failed_message: 570 return error; 571 } 572 573 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 574 { 575 int ret; 576 577 lock_comm(cinfo); 578 ret = __sendmsg(cinfo, cmsg); 579 unlock_comm(cinfo); 580 return ret; 581 } 582 583 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 584 { 585 struct md_cluster_info *cinfo = mddev->cluster_info; 586 int i, ret = 0; 587 struct dlm_lock_resource *bm_lockres; 588 struct suspend_info *s; 589 char str[64]; 590 591 592 for (i = 0; i < total_slots; i++) { 593 memset(str, '\0', 64); 594 snprintf(str, 64, "bitmap%04d", i); 595 bm_lockres = lockres_init(mddev, str, NULL, 1); 596 if (!bm_lockres) 597 return -ENOMEM; 598 if (i == (cinfo->slot_number - 1)) 599 continue; 600 601 bm_lockres->flags |= DLM_LKF_NOQUEUE; 602 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 603 if (ret == -EAGAIN) { 604 memset(bm_lockres->lksb.sb_lvbptr, '\0', LVB_SIZE); 605 s = read_resync_info(mddev, bm_lockres); 606 if (s) { 607 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 608 __func__, __LINE__, 609 (unsigned long long) s->lo, 610 (unsigned long long) s->hi, i); 611 spin_lock_irq(&cinfo->suspend_lock); 612 s->slot = i; 613 list_add(&s->list, &cinfo->suspend_list); 614 spin_unlock_irq(&cinfo->suspend_lock); 615 } 616 ret = 0; 617 lockres_free(bm_lockres); 618 continue; 619 } 620 if (ret) 621 goto out; 622 /* TODO: Read the disk bitmap sb and check if it needs recovery */ 623 dlm_unlock_sync(bm_lockres); 624 lockres_free(bm_lockres); 625 } 626 out: 627 return ret; 628 } 629 630 static int join(struct mddev *mddev, int nodes) 631 { 632 struct md_cluster_info *cinfo; 633 int ret, ops_rv; 634 char str[64]; 635 636 if (!try_module_get(THIS_MODULE)) 637 return -ENOENT; 638 639 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 640 if (!cinfo) 641 return -ENOMEM; 642 643 init_completion(&cinfo->completion); 644 645 mutex_init(&cinfo->sb_mutex); 646 mddev->cluster_info = cinfo; 647 648 memset(str, 0, 64); 649 pretty_uuid(str, mddev->uuid); 650 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 651 DLM_LSFL_FS, LVB_SIZE, 652 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 653 if (ret) 654 goto err; 655 wait_for_completion(&cinfo->completion); 656 if (nodes < cinfo->slot_number) { 657 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 658 cinfo->slot_number, nodes); 659 ret = -ERANGE; 660 goto err; 661 } 662 cinfo->sb_lock = lockres_init(mddev, "cmd-super", 663 NULL, 0); 664 if (!cinfo->sb_lock) { 665 ret = -ENOMEM; 666 goto err; 667 } 668 /* Initiate the communication resources */ 669 ret = -ENOMEM; 670 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 671 if (!cinfo->recv_thread) { 672 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 673 goto err; 674 } 675 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 676 if (!cinfo->message_lockres) 677 goto err; 678 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 679 if (!cinfo->token_lockres) 680 goto err; 681 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 682 if (!cinfo->ack_lockres) 683 goto err; 684 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 685 if (!cinfo->no_new_dev_lockres) 686 goto err; 687 688 /* get sync CR lock on ACK. */ 689 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 690 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 691 ret); 692 /* get sync CR lock on no-new-dev. */ 693 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 694 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 695 696 697 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 698 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 699 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 700 if (!cinfo->bitmap_lockres) 701 goto err; 702 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 703 pr_err("Failed to get bitmap lock\n"); 704 ret = -EINVAL; 705 goto err; 706 } 707 708 INIT_LIST_HEAD(&cinfo->suspend_list); 709 spin_lock_init(&cinfo->suspend_lock); 710 711 ret = gather_all_resync_info(mddev, nodes); 712 if (ret) 713 goto err; 714 715 return 0; 716 err: 717 lockres_free(cinfo->message_lockres); 718 lockres_free(cinfo->token_lockres); 719 lockres_free(cinfo->ack_lockres); 720 lockres_free(cinfo->no_new_dev_lockres); 721 lockres_free(cinfo->bitmap_lockres); 722 lockres_free(cinfo->sb_lock); 723 if (cinfo->lockspace) 724 dlm_release_lockspace(cinfo->lockspace, 2); 725 mddev->cluster_info = NULL; 726 kfree(cinfo); 727 module_put(THIS_MODULE); 728 return ret; 729 } 730 731 static int leave(struct mddev *mddev) 732 { 733 struct md_cluster_info *cinfo = mddev->cluster_info; 734 735 if (!cinfo) 736 return 0; 737 md_unregister_thread(&cinfo->recovery_thread); 738 md_unregister_thread(&cinfo->recv_thread); 739 lockres_free(cinfo->message_lockres); 740 lockres_free(cinfo->token_lockres); 741 lockres_free(cinfo->ack_lockres); 742 lockres_free(cinfo->no_new_dev_lockres); 743 lockres_free(cinfo->sb_lock); 744 lockres_free(cinfo->bitmap_lockres); 745 dlm_release_lockspace(cinfo->lockspace, 2); 746 return 0; 747 } 748 749 /* slot_number(): Returns the MD slot number to use 750 * DLM starts the slot numbers from 1, wheras cluster-md 751 * wants the number to be from zero, so we deduct one 752 */ 753 static int slot_number(struct mddev *mddev) 754 { 755 struct md_cluster_info *cinfo = mddev->cluster_info; 756 757 return cinfo->slot_number - 1; 758 } 759 760 static void resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 761 { 762 struct md_cluster_info *cinfo = mddev->cluster_info; 763 764 add_resync_info(mddev, cinfo->bitmap_lockres, lo, hi); 765 /* Re-acquire the lock to refresh LVB */ 766 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 767 } 768 769 static int metadata_update_start(struct mddev *mddev) 770 { 771 return lock_comm(mddev->cluster_info); 772 } 773 774 static int metadata_update_finish(struct mddev *mddev) 775 { 776 struct md_cluster_info *cinfo = mddev->cluster_info; 777 struct cluster_msg cmsg; 778 int ret; 779 780 memset(&cmsg, 0, sizeof(cmsg)); 781 cmsg.type = cpu_to_le32(METADATA_UPDATED); 782 ret = __sendmsg(cinfo, &cmsg); 783 unlock_comm(cinfo); 784 return ret; 785 } 786 787 static int metadata_update_cancel(struct mddev *mddev) 788 { 789 struct md_cluster_info *cinfo = mddev->cluster_info; 790 791 return dlm_unlock_sync(cinfo->token_lockres); 792 } 793 794 static int resync_send(struct mddev *mddev, enum msg_type type, 795 sector_t lo, sector_t hi) 796 { 797 struct md_cluster_info *cinfo = mddev->cluster_info; 798 struct cluster_msg cmsg; 799 int slot = cinfo->slot_number - 1; 800 801 pr_info("%s:%d lo: %llu hi: %llu\n", __func__, __LINE__, 802 (unsigned long long)lo, 803 (unsigned long long)hi); 804 resync_info_update(mddev, lo, hi); 805 cmsg.type = cpu_to_le32(type); 806 cmsg.slot = cpu_to_le32(slot); 807 cmsg.low = cpu_to_le64(lo); 808 cmsg.high = cpu_to_le64(hi); 809 return sendmsg(cinfo, &cmsg); 810 } 811 812 static int resync_start(struct mddev *mddev, sector_t lo, sector_t hi) 813 { 814 pr_info("%s:%d\n", __func__, __LINE__); 815 return resync_send(mddev, RESYNCING, lo, hi); 816 } 817 818 static void resync_finish(struct mddev *mddev) 819 { 820 pr_info("%s:%d\n", __func__, __LINE__); 821 resync_send(mddev, RESYNCING, 0, 0); 822 } 823 824 static int area_resyncing(struct mddev *mddev, int direction, 825 sector_t lo, sector_t hi) 826 { 827 struct md_cluster_info *cinfo = mddev->cluster_info; 828 int ret = 0; 829 struct suspend_info *s; 830 831 if ((direction == READ) && 832 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 833 return 1; 834 835 spin_lock_irq(&cinfo->suspend_lock); 836 if (list_empty(&cinfo->suspend_list)) 837 goto out; 838 list_for_each_entry(s, &cinfo->suspend_list, list) 839 if (hi > s->lo && lo < s->hi) { 840 ret = 1; 841 break; 842 } 843 out: 844 spin_unlock_irq(&cinfo->suspend_lock); 845 return ret; 846 } 847 848 static int add_new_disk_start(struct mddev *mddev, struct md_rdev *rdev) 849 { 850 struct md_cluster_info *cinfo = mddev->cluster_info; 851 struct cluster_msg cmsg; 852 int ret = 0; 853 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 854 char *uuid = sb->device_uuid; 855 856 memset(&cmsg, 0, sizeof(cmsg)); 857 cmsg.type = cpu_to_le32(NEWDISK); 858 memcpy(cmsg.uuid, uuid, 16); 859 cmsg.raid_slot = rdev->desc_nr; 860 lock_comm(cinfo); 861 ret = __sendmsg(cinfo, &cmsg); 862 if (ret) 863 return ret; 864 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 865 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 866 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 867 /* Some node does not "see" the device */ 868 if (ret == -EAGAIN) 869 ret = -ENOENT; 870 else 871 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 872 return ret; 873 } 874 875 static int add_new_disk_finish(struct mddev *mddev) 876 { 877 struct cluster_msg cmsg; 878 struct md_cluster_info *cinfo = mddev->cluster_info; 879 int ret; 880 /* Write sb and inform others */ 881 md_update_sb(mddev, 1); 882 cmsg.type = METADATA_UPDATED; 883 ret = __sendmsg(cinfo, &cmsg); 884 unlock_comm(cinfo); 885 return ret; 886 } 887 888 static int new_disk_ack(struct mddev *mddev, bool ack) 889 { 890 struct md_cluster_info *cinfo = mddev->cluster_info; 891 892 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 893 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 894 return -EINVAL; 895 } 896 897 if (ack) 898 dlm_unlock_sync(cinfo->no_new_dev_lockres); 899 complete(&cinfo->newdisk_completion); 900 return 0; 901 } 902 903 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 904 { 905 struct cluster_msg cmsg; 906 struct md_cluster_info *cinfo = mddev->cluster_info; 907 cmsg.type = REMOVE; 908 cmsg.raid_slot = rdev->desc_nr; 909 return __sendmsg(cinfo, &cmsg); 910 } 911 912 static int gather_bitmaps(struct md_rdev *rdev) 913 { 914 int sn, err; 915 sector_t lo, hi; 916 struct cluster_msg cmsg; 917 struct mddev *mddev = rdev->mddev; 918 struct md_cluster_info *cinfo = mddev->cluster_info; 919 920 cmsg.type = RE_ADD; 921 cmsg.raid_slot = rdev->desc_nr; 922 err = sendmsg(cinfo, &cmsg); 923 if (err) 924 goto out; 925 926 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 927 if (sn == (cinfo->slot_number - 1)) 928 continue; 929 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 930 if (err) { 931 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 932 goto out; 933 } 934 if ((hi > 0) && (lo < mddev->recovery_cp)) 935 mddev->recovery_cp = lo; 936 } 937 out: 938 return err; 939 } 940 941 static struct md_cluster_operations cluster_ops = { 942 .join = join, 943 .leave = leave, 944 .slot_number = slot_number, 945 .resync_info_update = resync_info_update, 946 .resync_start = resync_start, 947 .resync_finish = resync_finish, 948 .metadata_update_start = metadata_update_start, 949 .metadata_update_finish = metadata_update_finish, 950 .metadata_update_cancel = metadata_update_cancel, 951 .area_resyncing = area_resyncing, 952 .add_new_disk_start = add_new_disk_start, 953 .add_new_disk_finish = add_new_disk_finish, 954 .new_disk_ack = new_disk_ack, 955 .remove_disk = remove_disk, 956 .gather_bitmaps = gather_bitmaps, 957 }; 958 959 static int __init cluster_init(void) 960 { 961 pr_warn("md-cluster: EXPERIMENTAL. Use with caution\n"); 962 pr_info("Registering Cluster MD functions\n"); 963 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 964 return 0; 965 } 966 967 static void cluster_exit(void) 968 { 969 unregister_md_cluster_operations(); 970 } 971 972 module_init(cluster_init); 973 module_exit(cluster_exit); 974 MODULE_LICENSE("GPL"); 975 MODULE_DESCRIPTION("Clustering support for MD"); 976