1 /* 2 * Copyright (C) 2015, SUSE 3 * 4 * This program is free software; you can redistribute it and/or modify 5 * it under the terms of the GNU General Public License as published by 6 * the Free Software Foundation; either version 2, or (at your option) 7 * any later version. 8 * 9 */ 10 11 12 #include <linux/module.h> 13 #include <linux/kthread.h> 14 #include <linux/dlm.h> 15 #include <linux/sched.h> 16 #include <linux/raid/md_p.h> 17 #include "md.h" 18 #include "md-bitmap.h" 19 #include "md-cluster.h" 20 21 #define LVB_SIZE 64 22 #define NEW_DEV_TIMEOUT 5000 23 24 struct dlm_lock_resource { 25 dlm_lockspace_t *ls; 26 struct dlm_lksb lksb; 27 char *name; /* lock name. */ 28 uint32_t flags; /* flags to pass to dlm_lock() */ 29 wait_queue_head_t sync_locking; /* wait queue for synchronized locking */ 30 bool sync_locking_done; 31 void (*bast)(void *arg, int mode); /* blocking AST function pointer*/ 32 struct mddev *mddev; /* pointing back to mddev. */ 33 int mode; 34 }; 35 36 struct suspend_info { 37 int slot; 38 sector_t lo; 39 sector_t hi; 40 struct list_head list; 41 }; 42 43 struct resync_info { 44 __le64 lo; 45 __le64 hi; 46 }; 47 48 /* md_cluster_info flags */ 49 #define MD_CLUSTER_WAITING_FOR_NEWDISK 1 50 #define MD_CLUSTER_SUSPEND_READ_BALANCING 2 51 #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3 52 53 /* Lock the send communication. This is done through 54 * bit manipulation as opposed to a mutex in order to 55 * accomodate lock and hold. See next comment. 56 */ 57 #define MD_CLUSTER_SEND_LOCK 4 58 /* If cluster operations (such as adding a disk) must lock the 59 * communication channel, so as to perform extra operations 60 * (update metadata) and no other operation is allowed on the 61 * MD. Token needs to be locked and held until the operation 62 * completes witha md_update_sb(), which would eventually release 63 * the lock. 64 */ 65 #define MD_CLUSTER_SEND_LOCKED_ALREADY 5 66 /* We should receive message after node joined cluster and 67 * set up all the related infos such as bitmap and personality */ 68 #define MD_CLUSTER_ALREADY_IN_CLUSTER 6 69 #define MD_CLUSTER_PENDING_RECV_EVENT 7 70 #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8 71 72 struct md_cluster_info { 73 struct mddev *mddev; /* the md device which md_cluster_info belongs to */ 74 /* dlm lock space and resources for clustered raid. */ 75 dlm_lockspace_t *lockspace; 76 int slot_number; 77 struct completion completion; 78 struct mutex recv_mutex; 79 struct dlm_lock_resource *bitmap_lockres; 80 struct dlm_lock_resource **other_bitmap_lockres; 81 struct dlm_lock_resource *resync_lockres; 82 struct list_head suspend_list; 83 spinlock_t suspend_lock; 84 struct md_thread *recovery_thread; 85 unsigned long recovery_map; 86 /* communication loc resources */ 87 struct dlm_lock_resource *ack_lockres; 88 struct dlm_lock_resource *message_lockres; 89 struct dlm_lock_resource *token_lockres; 90 struct dlm_lock_resource *no_new_dev_lockres; 91 struct md_thread *recv_thread; 92 struct completion newdisk_completion; 93 wait_queue_head_t wait; 94 unsigned long state; 95 /* record the region in RESYNCING message */ 96 sector_t sync_low; 97 sector_t sync_hi; 98 }; 99 100 enum msg_type { 101 METADATA_UPDATED = 0, 102 RESYNCING, 103 NEWDISK, 104 REMOVE, 105 RE_ADD, 106 BITMAP_NEEDS_SYNC, 107 CHANGE_CAPACITY, 108 }; 109 110 struct cluster_msg { 111 __le32 type; 112 __le32 slot; 113 /* TODO: Unionize this for smaller footprint */ 114 __le64 low; 115 __le64 high; 116 char uuid[16]; 117 __le32 raid_slot; 118 }; 119 120 static void sync_ast(void *arg) 121 { 122 struct dlm_lock_resource *res; 123 124 res = arg; 125 res->sync_locking_done = true; 126 wake_up(&res->sync_locking); 127 } 128 129 static int dlm_lock_sync(struct dlm_lock_resource *res, int mode) 130 { 131 int ret = 0; 132 133 ret = dlm_lock(res->ls, mode, &res->lksb, 134 res->flags, res->name, strlen(res->name), 135 0, sync_ast, res, res->bast); 136 if (ret) 137 return ret; 138 wait_event(res->sync_locking, res->sync_locking_done); 139 res->sync_locking_done = false; 140 if (res->lksb.sb_status == 0) 141 res->mode = mode; 142 return res->lksb.sb_status; 143 } 144 145 static int dlm_unlock_sync(struct dlm_lock_resource *res) 146 { 147 return dlm_lock_sync(res, DLM_LOCK_NL); 148 } 149 150 /* 151 * An variation of dlm_lock_sync, which make lock request could 152 * be interrupted 153 */ 154 static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode, 155 struct mddev *mddev) 156 { 157 int ret = 0; 158 159 ret = dlm_lock(res->ls, mode, &res->lksb, 160 res->flags, res->name, strlen(res->name), 161 0, sync_ast, res, res->bast); 162 if (ret) 163 return ret; 164 165 wait_event(res->sync_locking, res->sync_locking_done 166 || kthread_should_stop() 167 || test_bit(MD_CLOSING, &mddev->flags)); 168 if (!res->sync_locking_done) { 169 /* 170 * the convert queue contains the lock request when request is 171 * interrupted, and sync_ast could still be run, so need to 172 * cancel the request and reset completion 173 */ 174 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL, 175 &res->lksb, res); 176 res->sync_locking_done = false; 177 if (unlikely(ret != 0)) 178 pr_info("failed to cancel previous lock request " 179 "%s return %d\n", res->name, ret); 180 return -EPERM; 181 } else 182 res->sync_locking_done = false; 183 if (res->lksb.sb_status == 0) 184 res->mode = mode; 185 return res->lksb.sb_status; 186 } 187 188 static struct dlm_lock_resource *lockres_init(struct mddev *mddev, 189 char *name, void (*bastfn)(void *arg, int mode), int with_lvb) 190 { 191 struct dlm_lock_resource *res = NULL; 192 int ret, namelen; 193 struct md_cluster_info *cinfo = mddev->cluster_info; 194 195 res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); 196 if (!res) 197 return NULL; 198 init_waitqueue_head(&res->sync_locking); 199 res->sync_locking_done = false; 200 res->ls = cinfo->lockspace; 201 res->mddev = mddev; 202 res->mode = DLM_LOCK_IV; 203 namelen = strlen(name); 204 res->name = kzalloc(namelen + 1, GFP_KERNEL); 205 if (!res->name) { 206 pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name); 207 goto out_err; 208 } 209 strlcpy(res->name, name, namelen + 1); 210 if (with_lvb) { 211 res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL); 212 if (!res->lksb.sb_lvbptr) { 213 pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name); 214 goto out_err; 215 } 216 res->flags = DLM_LKF_VALBLK; 217 } 218 219 if (bastfn) 220 res->bast = bastfn; 221 222 res->flags |= DLM_LKF_EXPEDITE; 223 224 ret = dlm_lock_sync(res, DLM_LOCK_NL); 225 if (ret) { 226 pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name); 227 goto out_err; 228 } 229 res->flags &= ~DLM_LKF_EXPEDITE; 230 res->flags |= DLM_LKF_CONVERT; 231 232 return res; 233 out_err: 234 kfree(res->lksb.sb_lvbptr); 235 kfree(res->name); 236 kfree(res); 237 return NULL; 238 } 239 240 static void lockres_free(struct dlm_lock_resource *res) 241 { 242 int ret = 0; 243 244 if (!res) 245 return; 246 247 /* 248 * use FORCEUNLOCK flag, so we can unlock even the lock is on the 249 * waiting or convert queue 250 */ 251 ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK, 252 &res->lksb, res); 253 if (unlikely(ret != 0)) 254 pr_err("failed to unlock %s return %d\n", res->name, ret); 255 else 256 wait_event(res->sync_locking, res->sync_locking_done); 257 258 kfree(res->name); 259 kfree(res->lksb.sb_lvbptr); 260 kfree(res); 261 } 262 263 static void add_resync_info(struct dlm_lock_resource *lockres, 264 sector_t lo, sector_t hi) 265 { 266 struct resync_info *ri; 267 268 ri = (struct resync_info *)lockres->lksb.sb_lvbptr; 269 ri->lo = cpu_to_le64(lo); 270 ri->hi = cpu_to_le64(hi); 271 } 272 273 static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres) 274 { 275 struct resync_info ri; 276 struct suspend_info *s = NULL; 277 sector_t hi = 0; 278 279 dlm_lock_sync(lockres, DLM_LOCK_CR); 280 memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 281 hi = le64_to_cpu(ri.hi); 282 if (hi > 0) { 283 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 284 if (!s) 285 goto out; 286 s->hi = hi; 287 s->lo = le64_to_cpu(ri.lo); 288 } 289 dlm_unlock_sync(lockres); 290 out: 291 return s; 292 } 293 294 static void recover_bitmaps(struct md_thread *thread) 295 { 296 struct mddev *mddev = thread->mddev; 297 struct md_cluster_info *cinfo = mddev->cluster_info; 298 struct dlm_lock_resource *bm_lockres; 299 char str[64]; 300 int slot, ret; 301 struct suspend_info *s, *tmp; 302 sector_t lo, hi; 303 304 while (cinfo->recovery_map) { 305 slot = fls64((u64)cinfo->recovery_map) - 1; 306 307 snprintf(str, 64, "bitmap%04d", slot); 308 bm_lockres = lockres_init(mddev, str, NULL, 1); 309 if (!bm_lockres) { 310 pr_err("md-cluster: Cannot initialize bitmaps\n"); 311 goto clear_bit; 312 } 313 314 ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev); 315 if (ret) { 316 pr_err("md-cluster: Could not DLM lock %s: %d\n", 317 str, ret); 318 goto clear_bit; 319 } 320 ret = bitmap_copy_from_slot(mddev, slot, &lo, &hi, true); 321 if (ret) { 322 pr_err("md-cluster: Could not copy data from bitmap %d\n", slot); 323 goto clear_bit; 324 } 325 326 /* Clear suspend_area associated with the bitmap */ 327 spin_lock_irq(&cinfo->suspend_lock); 328 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 329 if (slot == s->slot) { 330 list_del(&s->list); 331 kfree(s); 332 } 333 spin_unlock_irq(&cinfo->suspend_lock); 334 335 if (hi > 0) { 336 if (lo < mddev->recovery_cp) 337 mddev->recovery_cp = lo; 338 /* wake up thread to continue resync in case resync 339 * is not finished */ 340 if (mddev->recovery_cp != MaxSector) { 341 /* 342 * clear the REMOTE flag since we will launch 343 * resync thread in current node. 344 */ 345 clear_bit(MD_RESYNCING_REMOTE, 346 &mddev->recovery); 347 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 348 md_wakeup_thread(mddev->thread); 349 } 350 } 351 clear_bit: 352 lockres_free(bm_lockres); 353 clear_bit(slot, &cinfo->recovery_map); 354 } 355 } 356 357 static void recover_prep(void *arg) 358 { 359 struct mddev *mddev = arg; 360 struct md_cluster_info *cinfo = mddev->cluster_info; 361 set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 362 } 363 364 static void __recover_slot(struct mddev *mddev, int slot) 365 { 366 struct md_cluster_info *cinfo = mddev->cluster_info; 367 368 set_bit(slot, &cinfo->recovery_map); 369 if (!cinfo->recovery_thread) { 370 cinfo->recovery_thread = md_register_thread(recover_bitmaps, 371 mddev, "recover"); 372 if (!cinfo->recovery_thread) { 373 pr_warn("md-cluster: Could not create recovery thread\n"); 374 return; 375 } 376 } 377 md_wakeup_thread(cinfo->recovery_thread); 378 } 379 380 static void recover_slot(void *arg, struct dlm_slot *slot) 381 { 382 struct mddev *mddev = arg; 383 struct md_cluster_info *cinfo = mddev->cluster_info; 384 385 pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n", 386 mddev->bitmap_info.cluster_name, 387 slot->nodeid, slot->slot, 388 cinfo->slot_number); 389 /* deduct one since dlm slot starts from one while the num of 390 * cluster-md begins with 0 */ 391 __recover_slot(mddev, slot->slot - 1); 392 } 393 394 static void recover_done(void *arg, struct dlm_slot *slots, 395 int num_slots, int our_slot, 396 uint32_t generation) 397 { 398 struct mddev *mddev = arg; 399 struct md_cluster_info *cinfo = mddev->cluster_info; 400 401 cinfo->slot_number = our_slot; 402 /* completion is only need to be complete when node join cluster, 403 * it doesn't need to run during another node's failure */ 404 if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) { 405 complete(&cinfo->completion); 406 clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 407 } 408 clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state); 409 } 410 411 /* the ops is called when node join the cluster, and do lock recovery 412 * if node failure occurs */ 413 static const struct dlm_lockspace_ops md_ls_ops = { 414 .recover_prep = recover_prep, 415 .recover_slot = recover_slot, 416 .recover_done = recover_done, 417 }; 418 419 /* 420 * The BAST function for the ack lock resource 421 * This function wakes up the receive thread in 422 * order to receive and process the message. 423 */ 424 static void ack_bast(void *arg, int mode) 425 { 426 struct dlm_lock_resource *res = arg; 427 struct md_cluster_info *cinfo = res->mddev->cluster_info; 428 429 if (mode == DLM_LOCK_EX) { 430 if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state)) 431 md_wakeup_thread(cinfo->recv_thread); 432 else 433 set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state); 434 } 435 } 436 437 static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot) 438 { 439 struct suspend_info *s, *tmp; 440 441 list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list) 442 if (slot == s->slot) { 443 list_del(&s->list); 444 kfree(s); 445 break; 446 } 447 } 448 449 static void remove_suspend_info(struct mddev *mddev, int slot) 450 { 451 struct md_cluster_info *cinfo = mddev->cluster_info; 452 mddev->pers->quiesce(mddev, 1); 453 spin_lock_irq(&cinfo->suspend_lock); 454 __remove_suspend_info(cinfo, slot); 455 spin_unlock_irq(&cinfo->suspend_lock); 456 mddev->pers->quiesce(mddev, 0); 457 } 458 459 460 static void process_suspend_info(struct mddev *mddev, 461 int slot, sector_t lo, sector_t hi) 462 { 463 struct md_cluster_info *cinfo = mddev->cluster_info; 464 struct suspend_info *s; 465 466 if (!hi) { 467 /* 468 * clear the REMOTE flag since resync or recovery is finished 469 * in remote node. 470 */ 471 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 472 remove_suspend_info(mddev, slot); 473 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 474 md_wakeup_thread(mddev->thread); 475 return; 476 } 477 478 /* 479 * The bitmaps are not same for different nodes 480 * if RESYNCING is happening in one node, then 481 * the node which received the RESYNCING message 482 * probably will perform resync with the region 483 * [lo, hi] again, so we could reduce resync time 484 * a lot if we can ensure that the bitmaps among 485 * different nodes are match up well. 486 * 487 * sync_low/hi is used to record the region which 488 * arrived in the previous RESYNCING message, 489 * 490 * Call bitmap_sync_with_cluster to clear 491 * NEEDED_MASK and set RESYNC_MASK since 492 * resync thread is running in another node, 493 * so we don't need to do the resync again 494 * with the same section */ 495 bitmap_sync_with_cluster(mddev, cinfo->sync_low, 496 cinfo->sync_hi, 497 lo, hi); 498 cinfo->sync_low = lo; 499 cinfo->sync_hi = hi; 500 501 s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL); 502 if (!s) 503 return; 504 s->slot = slot; 505 s->lo = lo; 506 s->hi = hi; 507 mddev->pers->quiesce(mddev, 1); 508 spin_lock_irq(&cinfo->suspend_lock); 509 /* Remove existing entry (if exists) before adding */ 510 __remove_suspend_info(cinfo, slot); 511 list_add(&s->list, &cinfo->suspend_list); 512 spin_unlock_irq(&cinfo->suspend_lock); 513 mddev->pers->quiesce(mddev, 0); 514 } 515 516 static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg) 517 { 518 char disk_uuid[64]; 519 struct md_cluster_info *cinfo = mddev->cluster_info; 520 char event_name[] = "EVENT=ADD_DEVICE"; 521 char raid_slot[16]; 522 char *envp[] = {event_name, disk_uuid, raid_slot, NULL}; 523 int len; 524 525 len = snprintf(disk_uuid, 64, "DEVICE_UUID="); 526 sprintf(disk_uuid + len, "%pU", cmsg->uuid); 527 snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot)); 528 pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot); 529 init_completion(&cinfo->newdisk_completion); 530 set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 531 kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp); 532 wait_for_completion_timeout(&cinfo->newdisk_completion, 533 NEW_DEV_TIMEOUT); 534 clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state); 535 } 536 537 538 static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg) 539 { 540 int got_lock = 0; 541 struct md_cluster_info *cinfo = mddev->cluster_info; 542 mddev->good_device_nr = le32_to_cpu(msg->raid_slot); 543 544 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 545 wait_event(mddev->thread->wqueue, 546 (got_lock = mddev_trylock(mddev)) || 547 test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state)); 548 md_reload_sb(mddev, mddev->good_device_nr); 549 if (got_lock) 550 mddev_unlock(mddev); 551 } 552 553 static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg) 554 { 555 struct md_rdev *rdev; 556 557 rcu_read_lock(); 558 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 559 if (rdev) { 560 set_bit(ClusterRemove, &rdev->flags); 561 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 562 md_wakeup_thread(mddev->thread); 563 } 564 else 565 pr_warn("%s: %d Could not find disk(%d) to REMOVE\n", 566 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 567 rcu_read_unlock(); 568 } 569 570 static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg) 571 { 572 struct md_rdev *rdev; 573 574 rcu_read_lock(); 575 rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot)); 576 if (rdev && test_bit(Faulty, &rdev->flags)) 577 clear_bit(Faulty, &rdev->flags); 578 else 579 pr_warn("%s: %d Could not find disk(%d) which is faulty", 580 __func__, __LINE__, le32_to_cpu(msg->raid_slot)); 581 rcu_read_unlock(); 582 } 583 584 static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg) 585 { 586 int ret = 0; 587 588 if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot), 589 "node %d received it's own msg\n", le32_to_cpu(msg->slot))) 590 return -1; 591 switch (le32_to_cpu(msg->type)) { 592 case METADATA_UPDATED: 593 process_metadata_update(mddev, msg); 594 break; 595 case CHANGE_CAPACITY: 596 set_capacity(mddev->gendisk, mddev->array_sectors); 597 revalidate_disk(mddev->gendisk); 598 break; 599 case RESYNCING: 600 set_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 601 process_suspend_info(mddev, le32_to_cpu(msg->slot), 602 le64_to_cpu(msg->low), 603 le64_to_cpu(msg->high)); 604 break; 605 case NEWDISK: 606 process_add_new_disk(mddev, msg); 607 break; 608 case REMOVE: 609 process_remove_disk(mddev, msg); 610 break; 611 case RE_ADD: 612 process_readd_disk(mddev, msg); 613 break; 614 case BITMAP_NEEDS_SYNC: 615 __recover_slot(mddev, le32_to_cpu(msg->slot)); 616 break; 617 default: 618 ret = -1; 619 pr_warn("%s:%d Received unknown message from %d\n", 620 __func__, __LINE__, msg->slot); 621 } 622 return ret; 623 } 624 625 /* 626 * thread for receiving message 627 */ 628 static void recv_daemon(struct md_thread *thread) 629 { 630 struct md_cluster_info *cinfo = thread->mddev->cluster_info; 631 struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres; 632 struct dlm_lock_resource *message_lockres = cinfo->message_lockres; 633 struct cluster_msg msg; 634 int ret; 635 636 mutex_lock(&cinfo->recv_mutex); 637 /*get CR on Message*/ 638 if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) { 639 pr_err("md/raid1:failed to get CR on MESSAGE\n"); 640 mutex_unlock(&cinfo->recv_mutex); 641 return; 642 } 643 644 /* read lvb and wake up thread to process this message_lockres */ 645 memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg)); 646 ret = process_recvd_msg(thread->mddev, &msg); 647 if (ret) 648 goto out; 649 650 /*release CR on ack_lockres*/ 651 ret = dlm_unlock_sync(ack_lockres); 652 if (unlikely(ret != 0)) 653 pr_info("unlock ack failed return %d\n", ret); 654 /*up-convert to PR on message_lockres*/ 655 ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR); 656 if (unlikely(ret != 0)) 657 pr_info("lock PR on msg failed return %d\n", ret); 658 /*get CR on ack_lockres again*/ 659 ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR); 660 if (unlikely(ret != 0)) 661 pr_info("lock CR on ack failed return %d\n", ret); 662 out: 663 /*release CR on message_lockres*/ 664 ret = dlm_unlock_sync(message_lockres); 665 if (unlikely(ret != 0)) 666 pr_info("unlock msg failed return %d\n", ret); 667 mutex_unlock(&cinfo->recv_mutex); 668 } 669 670 /* lock_token() 671 * Takes the lock on the TOKEN lock resource so no other 672 * node can communicate while the operation is underway. 673 */ 674 static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked) 675 { 676 int error, set_bit = 0; 677 struct mddev *mddev = cinfo->mddev; 678 679 /* 680 * If resync thread run after raid1d thread, then process_metadata_update 681 * could not continue if raid1d held reconfig_mutex (and raid1d is blocked 682 * since another node already got EX on Token and waitting the EX of Ack), 683 * so let resync wake up thread in case flag is set. 684 */ 685 if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 686 &cinfo->state)) { 687 error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 688 &cinfo->state); 689 WARN_ON_ONCE(error); 690 md_wakeup_thread(mddev->thread); 691 set_bit = 1; 692 } 693 error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 694 if (set_bit) 695 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 696 697 if (error) 698 pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n", 699 __func__, __LINE__, error); 700 701 /* Lock the receive sequence */ 702 mutex_lock(&cinfo->recv_mutex); 703 return error; 704 } 705 706 /* lock_comm() 707 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel. 708 */ 709 static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked) 710 { 711 wait_event(cinfo->wait, 712 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state)); 713 714 return lock_token(cinfo, mddev_locked); 715 } 716 717 static void unlock_comm(struct md_cluster_info *cinfo) 718 { 719 WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX); 720 mutex_unlock(&cinfo->recv_mutex); 721 dlm_unlock_sync(cinfo->token_lockres); 722 clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state); 723 wake_up(&cinfo->wait); 724 } 725 726 /* __sendmsg() 727 * This function performs the actual sending of the message. This function is 728 * usually called after performing the encompassing operation 729 * The function: 730 * 1. Grabs the message lockresource in EX mode 731 * 2. Copies the message to the message LVB 732 * 3. Downconverts message lockresource to CW 733 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes 734 * and the other nodes read the message. The thread will wait here until all other 735 * nodes have released ack lock resource. 736 * 5. Downconvert ack lockresource to CR 737 */ 738 static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg) 739 { 740 int error; 741 int slot = cinfo->slot_number - 1; 742 743 cmsg->slot = cpu_to_le32(slot); 744 /*get EX on Message*/ 745 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX); 746 if (error) { 747 pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error); 748 goto failed_message; 749 } 750 751 memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg, 752 sizeof(struct cluster_msg)); 753 /*down-convert EX to CW on Message*/ 754 error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW); 755 if (error) { 756 pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n", 757 error); 758 goto failed_ack; 759 } 760 761 /*up-convert CR to EX on Ack*/ 762 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX); 763 if (error) { 764 pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n", 765 error); 766 goto failed_ack; 767 } 768 769 /*down-convert EX to CR on Ack*/ 770 error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR); 771 if (error) { 772 pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n", 773 error); 774 goto failed_ack; 775 } 776 777 failed_ack: 778 error = dlm_unlock_sync(cinfo->message_lockres); 779 if (unlikely(error != 0)) { 780 pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n", 781 error); 782 /* in case the message can't be released due to some reason */ 783 goto failed_ack; 784 } 785 failed_message: 786 return error; 787 } 788 789 static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg, 790 bool mddev_locked) 791 { 792 int ret; 793 794 lock_comm(cinfo, mddev_locked); 795 ret = __sendmsg(cinfo, cmsg); 796 unlock_comm(cinfo); 797 return ret; 798 } 799 800 static int gather_all_resync_info(struct mddev *mddev, int total_slots) 801 { 802 struct md_cluster_info *cinfo = mddev->cluster_info; 803 int i, ret = 0; 804 struct dlm_lock_resource *bm_lockres; 805 struct suspend_info *s; 806 char str[64]; 807 sector_t lo, hi; 808 809 810 for (i = 0; i < total_slots; i++) { 811 memset(str, '\0', 64); 812 snprintf(str, 64, "bitmap%04d", i); 813 bm_lockres = lockres_init(mddev, str, NULL, 1); 814 if (!bm_lockres) 815 return -ENOMEM; 816 if (i == (cinfo->slot_number - 1)) { 817 lockres_free(bm_lockres); 818 continue; 819 } 820 821 bm_lockres->flags |= DLM_LKF_NOQUEUE; 822 ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 823 if (ret == -EAGAIN) { 824 s = read_resync_info(mddev, bm_lockres); 825 if (s) { 826 pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n", 827 __func__, __LINE__, 828 (unsigned long long) s->lo, 829 (unsigned long long) s->hi, i); 830 spin_lock_irq(&cinfo->suspend_lock); 831 s->slot = i; 832 list_add(&s->list, &cinfo->suspend_list); 833 spin_unlock_irq(&cinfo->suspend_lock); 834 } 835 ret = 0; 836 lockres_free(bm_lockres); 837 continue; 838 } 839 if (ret) { 840 lockres_free(bm_lockres); 841 goto out; 842 } 843 844 /* Read the disk bitmap sb and check if it needs recovery */ 845 ret = bitmap_copy_from_slot(mddev, i, &lo, &hi, false); 846 if (ret) { 847 pr_warn("md-cluster: Could not gather bitmaps from slot %d", i); 848 lockres_free(bm_lockres); 849 continue; 850 } 851 if ((hi > 0) && (lo < mddev->recovery_cp)) { 852 set_bit(MD_RECOVERY_NEEDED, &mddev->recovery); 853 mddev->recovery_cp = lo; 854 md_check_recovery(mddev); 855 } 856 857 lockres_free(bm_lockres); 858 } 859 out: 860 return ret; 861 } 862 863 static int join(struct mddev *mddev, int nodes) 864 { 865 struct md_cluster_info *cinfo; 866 int ret, ops_rv; 867 char str[64]; 868 869 cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL); 870 if (!cinfo) 871 return -ENOMEM; 872 873 INIT_LIST_HEAD(&cinfo->suspend_list); 874 spin_lock_init(&cinfo->suspend_lock); 875 init_completion(&cinfo->completion); 876 set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state); 877 init_waitqueue_head(&cinfo->wait); 878 mutex_init(&cinfo->recv_mutex); 879 880 mddev->cluster_info = cinfo; 881 cinfo->mddev = mddev; 882 883 memset(str, 0, 64); 884 sprintf(str, "%pU", mddev->uuid); 885 ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name, 886 DLM_LSFL_FS, LVB_SIZE, 887 &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace); 888 if (ret) 889 goto err; 890 wait_for_completion(&cinfo->completion); 891 if (nodes < cinfo->slot_number) { 892 pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).", 893 cinfo->slot_number, nodes); 894 ret = -ERANGE; 895 goto err; 896 } 897 /* Initiate the communication resources */ 898 ret = -ENOMEM; 899 cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv"); 900 if (!cinfo->recv_thread) { 901 pr_err("md-cluster: cannot allocate memory for recv_thread!\n"); 902 goto err; 903 } 904 cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1); 905 if (!cinfo->message_lockres) 906 goto err; 907 cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0); 908 if (!cinfo->token_lockres) 909 goto err; 910 cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0); 911 if (!cinfo->no_new_dev_lockres) 912 goto err; 913 914 ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX); 915 if (ret) { 916 ret = -EAGAIN; 917 pr_err("md-cluster: can't join cluster to avoid lock issue\n"); 918 goto err; 919 } 920 cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0); 921 if (!cinfo->ack_lockres) { 922 ret = -ENOMEM; 923 goto err; 924 } 925 /* get sync CR lock on ACK. */ 926 if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR)) 927 pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n", 928 ret); 929 dlm_unlock_sync(cinfo->token_lockres); 930 /* get sync CR lock on no-new-dev. */ 931 if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR)) 932 pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret); 933 934 935 pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number); 936 snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1); 937 cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1); 938 if (!cinfo->bitmap_lockres) { 939 ret = -ENOMEM; 940 goto err; 941 } 942 if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) { 943 pr_err("Failed to get bitmap lock\n"); 944 ret = -EINVAL; 945 goto err; 946 } 947 948 cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0); 949 if (!cinfo->resync_lockres) { 950 ret = -ENOMEM; 951 goto err; 952 } 953 954 return 0; 955 err: 956 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 957 md_unregister_thread(&cinfo->recovery_thread); 958 md_unregister_thread(&cinfo->recv_thread); 959 lockres_free(cinfo->message_lockres); 960 lockres_free(cinfo->token_lockres); 961 lockres_free(cinfo->ack_lockres); 962 lockres_free(cinfo->no_new_dev_lockres); 963 lockres_free(cinfo->resync_lockres); 964 lockres_free(cinfo->bitmap_lockres); 965 if (cinfo->lockspace) 966 dlm_release_lockspace(cinfo->lockspace, 2); 967 mddev->cluster_info = NULL; 968 kfree(cinfo); 969 return ret; 970 } 971 972 static void load_bitmaps(struct mddev *mddev, int total_slots) 973 { 974 struct md_cluster_info *cinfo = mddev->cluster_info; 975 976 /* load all the node's bitmap info for resync */ 977 if (gather_all_resync_info(mddev, total_slots)) 978 pr_err("md-cluster: failed to gather all resyn infos\n"); 979 set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state); 980 /* wake up recv thread in case something need to be handled */ 981 if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state)) 982 md_wakeup_thread(cinfo->recv_thread); 983 } 984 985 static void resync_bitmap(struct mddev *mddev) 986 { 987 struct md_cluster_info *cinfo = mddev->cluster_info; 988 struct cluster_msg cmsg = {0}; 989 int err; 990 991 cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC); 992 err = sendmsg(cinfo, &cmsg, 1); 993 if (err) 994 pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n", 995 __func__, __LINE__, err); 996 } 997 998 static void unlock_all_bitmaps(struct mddev *mddev); 999 static int leave(struct mddev *mddev) 1000 { 1001 struct md_cluster_info *cinfo = mddev->cluster_info; 1002 1003 if (!cinfo) 1004 return 0; 1005 1006 /* BITMAP_NEEDS_SYNC message should be sent when node 1007 * is leaving the cluster with dirty bitmap, also we 1008 * can only deliver it when dlm connection is available */ 1009 if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) 1010 resync_bitmap(mddev); 1011 1012 set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1013 md_unregister_thread(&cinfo->recovery_thread); 1014 md_unregister_thread(&cinfo->recv_thread); 1015 lockres_free(cinfo->message_lockres); 1016 lockres_free(cinfo->token_lockres); 1017 lockres_free(cinfo->ack_lockres); 1018 lockres_free(cinfo->no_new_dev_lockres); 1019 lockres_free(cinfo->resync_lockres); 1020 lockres_free(cinfo->bitmap_lockres); 1021 unlock_all_bitmaps(mddev); 1022 dlm_release_lockspace(cinfo->lockspace, 2); 1023 kfree(cinfo); 1024 return 0; 1025 } 1026 1027 /* slot_number(): Returns the MD slot number to use 1028 * DLM starts the slot numbers from 1, wheras cluster-md 1029 * wants the number to be from zero, so we deduct one 1030 */ 1031 static int slot_number(struct mddev *mddev) 1032 { 1033 struct md_cluster_info *cinfo = mddev->cluster_info; 1034 1035 return cinfo->slot_number - 1; 1036 } 1037 1038 /* 1039 * Check if the communication is already locked, else lock the communication 1040 * channel. 1041 * If it is already locked, token is in EX mode, and hence lock_token() 1042 * should not be called. 1043 */ 1044 static int metadata_update_start(struct mddev *mddev) 1045 { 1046 struct md_cluster_info *cinfo = mddev->cluster_info; 1047 int ret; 1048 1049 /* 1050 * metadata_update_start is always called with the protection of 1051 * reconfig_mutex, so set WAITING_FOR_TOKEN here. 1052 */ 1053 ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, 1054 &cinfo->state); 1055 WARN_ON_ONCE(ret); 1056 md_wakeup_thread(mddev->thread); 1057 1058 wait_event(cinfo->wait, 1059 !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) || 1060 test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state)); 1061 1062 /* If token is already locked, return 0 */ 1063 if (cinfo->token_lockres->mode == DLM_LOCK_EX) { 1064 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1065 return 0; 1066 } 1067 1068 ret = lock_token(cinfo, 1); 1069 clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state); 1070 return ret; 1071 } 1072 1073 static int metadata_update_finish(struct mddev *mddev) 1074 { 1075 struct md_cluster_info *cinfo = mddev->cluster_info; 1076 struct cluster_msg cmsg; 1077 struct md_rdev *rdev; 1078 int ret = 0; 1079 int raid_slot = -1; 1080 1081 memset(&cmsg, 0, sizeof(cmsg)); 1082 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1083 /* Pick up a good active device number to send. 1084 */ 1085 rdev_for_each(rdev, mddev) 1086 if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) { 1087 raid_slot = rdev->desc_nr; 1088 break; 1089 } 1090 if (raid_slot >= 0) { 1091 cmsg.raid_slot = cpu_to_le32(raid_slot); 1092 ret = __sendmsg(cinfo, &cmsg); 1093 } else 1094 pr_warn("md-cluster: No good device id found to send\n"); 1095 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1096 unlock_comm(cinfo); 1097 return ret; 1098 } 1099 1100 static void metadata_update_cancel(struct mddev *mddev) 1101 { 1102 struct md_cluster_info *cinfo = mddev->cluster_info; 1103 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1104 unlock_comm(cinfo); 1105 } 1106 1107 /* 1108 * return 0 if all the bitmaps have the same sync_size 1109 */ 1110 static int cluster_check_sync_size(struct mddev *mddev) 1111 { 1112 int i, rv; 1113 bitmap_super_t *sb; 1114 unsigned long my_sync_size, sync_size = 0; 1115 int node_num = mddev->bitmap_info.nodes; 1116 int current_slot = md_cluster_ops->slot_number(mddev); 1117 struct bitmap *bitmap = mddev->bitmap; 1118 char str[64]; 1119 struct dlm_lock_resource *bm_lockres; 1120 1121 sb = kmap_atomic(bitmap->storage.sb_page); 1122 my_sync_size = sb->sync_size; 1123 kunmap_atomic(sb); 1124 1125 for (i = 0; i < node_num; i++) { 1126 if (i == current_slot) 1127 continue; 1128 1129 bitmap = get_bitmap_from_slot(mddev, i); 1130 if (IS_ERR(bitmap)) { 1131 pr_err("can't get bitmap from slot %d\n", i); 1132 return -1; 1133 } 1134 1135 /* 1136 * If we can hold the bitmap lock of one node then 1137 * the slot is not occupied, update the sb. 1138 */ 1139 snprintf(str, 64, "bitmap%04d", i); 1140 bm_lockres = lockres_init(mddev, str, NULL, 1); 1141 if (!bm_lockres) { 1142 pr_err("md-cluster: Cannot initialize %s\n", str); 1143 bitmap_free(bitmap); 1144 return -1; 1145 } 1146 bm_lockres->flags |= DLM_LKF_NOQUEUE; 1147 rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW); 1148 if (!rv) 1149 bitmap_update_sb(bitmap); 1150 lockres_free(bm_lockres); 1151 1152 sb = kmap_atomic(bitmap->storage.sb_page); 1153 if (sync_size == 0) 1154 sync_size = sb->sync_size; 1155 else if (sync_size != sb->sync_size) { 1156 kunmap_atomic(sb); 1157 bitmap_free(bitmap); 1158 return -1; 1159 } 1160 kunmap_atomic(sb); 1161 bitmap_free(bitmap); 1162 } 1163 1164 return (my_sync_size == sync_size) ? 0 : -1; 1165 } 1166 1167 /* 1168 * Update the size for cluster raid is a little more complex, we perform it 1169 * by the steps: 1170 * 1. hold token lock and update superblock in initiator node. 1171 * 2. send METADATA_UPDATED msg to other nodes. 1172 * 3. The initiator node continues to check each bitmap's sync_size, if all 1173 * bitmaps have the same value of sync_size, then we can set capacity and 1174 * let other nodes to perform it. If one node can't update sync_size 1175 * accordingly, we need to revert to previous value. 1176 */ 1177 static void update_size(struct mddev *mddev, sector_t old_dev_sectors) 1178 { 1179 struct md_cluster_info *cinfo = mddev->cluster_info; 1180 struct cluster_msg cmsg; 1181 struct md_rdev *rdev; 1182 int ret = 0; 1183 int raid_slot = -1; 1184 1185 md_update_sb(mddev, 1); 1186 lock_comm(cinfo, 1); 1187 1188 memset(&cmsg, 0, sizeof(cmsg)); 1189 cmsg.type = cpu_to_le32(METADATA_UPDATED); 1190 rdev_for_each(rdev, mddev) 1191 if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) { 1192 raid_slot = rdev->desc_nr; 1193 break; 1194 } 1195 if (raid_slot >= 0) { 1196 cmsg.raid_slot = cpu_to_le32(raid_slot); 1197 /* 1198 * We can only change capiticy after all the nodes can do it, 1199 * so need to wait after other nodes already received the msg 1200 * and handled the change 1201 */ 1202 ret = __sendmsg(cinfo, &cmsg); 1203 if (ret) { 1204 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1205 __func__, __LINE__); 1206 unlock_comm(cinfo); 1207 return; 1208 } 1209 } else { 1210 pr_err("md-cluster: No good device id found to send\n"); 1211 unlock_comm(cinfo); 1212 return; 1213 } 1214 1215 /* 1216 * check the sync_size from other node's bitmap, if sync_size 1217 * have already updated in other nodes as expected, send an 1218 * empty metadata msg to permit the change of capacity 1219 */ 1220 if (cluster_check_sync_size(mddev) == 0) { 1221 memset(&cmsg, 0, sizeof(cmsg)); 1222 cmsg.type = cpu_to_le32(CHANGE_CAPACITY); 1223 ret = __sendmsg(cinfo, &cmsg); 1224 if (ret) 1225 pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n", 1226 __func__, __LINE__); 1227 set_capacity(mddev->gendisk, mddev->array_sectors); 1228 revalidate_disk(mddev->gendisk); 1229 } else { 1230 /* revert to previous sectors */ 1231 ret = mddev->pers->resize(mddev, old_dev_sectors); 1232 if (!ret) 1233 revalidate_disk(mddev->gendisk); 1234 ret = __sendmsg(cinfo, &cmsg); 1235 if (ret) 1236 pr_err("%s:%d: failed to send METADATA_UPDATED msg\n", 1237 __func__, __LINE__); 1238 } 1239 unlock_comm(cinfo); 1240 } 1241 1242 static int resync_start(struct mddev *mddev) 1243 { 1244 struct md_cluster_info *cinfo = mddev->cluster_info; 1245 return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev); 1246 } 1247 1248 static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi) 1249 { 1250 struct md_cluster_info *cinfo = mddev->cluster_info; 1251 struct resync_info ri; 1252 struct cluster_msg cmsg = {0}; 1253 1254 /* do not send zero again, if we have sent before */ 1255 if (hi == 0) { 1256 memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info)); 1257 if (le64_to_cpu(ri.hi) == 0) 1258 return 0; 1259 } 1260 1261 add_resync_info(cinfo->bitmap_lockres, lo, hi); 1262 /* Re-acquire the lock to refresh LVB */ 1263 dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW); 1264 cmsg.type = cpu_to_le32(RESYNCING); 1265 cmsg.low = cpu_to_le64(lo); 1266 cmsg.high = cpu_to_le64(hi); 1267 1268 /* 1269 * mddev_lock is held if resync_info_update is called from 1270 * resync_finish (md_reap_sync_thread -> resync_finish) 1271 */ 1272 if (lo == 0 && hi == 0) 1273 return sendmsg(cinfo, &cmsg, 1); 1274 else 1275 return sendmsg(cinfo, &cmsg, 0); 1276 } 1277 1278 static int resync_finish(struct mddev *mddev) 1279 { 1280 struct md_cluster_info *cinfo = mddev->cluster_info; 1281 1282 clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery); 1283 dlm_unlock_sync(cinfo->resync_lockres); 1284 1285 /* 1286 * If resync thread is interrupted so we can't say resync is finished, 1287 * another node will launch resync thread to continue. 1288 */ 1289 if (test_bit(MD_CLOSING, &mddev->flags)) 1290 return 0; 1291 else 1292 return resync_info_update(mddev, 0, 0); 1293 } 1294 1295 static int area_resyncing(struct mddev *mddev, int direction, 1296 sector_t lo, sector_t hi) 1297 { 1298 struct md_cluster_info *cinfo = mddev->cluster_info; 1299 int ret = 0; 1300 struct suspend_info *s; 1301 1302 if ((direction == READ) && 1303 test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state)) 1304 return 1; 1305 1306 spin_lock_irq(&cinfo->suspend_lock); 1307 if (list_empty(&cinfo->suspend_list)) 1308 goto out; 1309 list_for_each_entry(s, &cinfo->suspend_list, list) 1310 if (hi > s->lo && lo < s->hi) { 1311 ret = 1; 1312 break; 1313 } 1314 out: 1315 spin_unlock_irq(&cinfo->suspend_lock); 1316 return ret; 1317 } 1318 1319 /* add_new_disk() - initiates a disk add 1320 * However, if this fails before writing md_update_sb(), 1321 * add_new_disk_cancel() must be called to release token lock 1322 */ 1323 static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev) 1324 { 1325 struct md_cluster_info *cinfo = mddev->cluster_info; 1326 struct cluster_msg cmsg; 1327 int ret = 0; 1328 struct mdp_superblock_1 *sb = page_address(rdev->sb_page); 1329 char *uuid = sb->device_uuid; 1330 1331 memset(&cmsg, 0, sizeof(cmsg)); 1332 cmsg.type = cpu_to_le32(NEWDISK); 1333 memcpy(cmsg.uuid, uuid, 16); 1334 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1335 lock_comm(cinfo, 1); 1336 ret = __sendmsg(cinfo, &cmsg); 1337 if (ret) { 1338 unlock_comm(cinfo); 1339 return ret; 1340 } 1341 cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE; 1342 ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX); 1343 cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE; 1344 /* Some node does not "see" the device */ 1345 if (ret == -EAGAIN) 1346 ret = -ENOENT; 1347 if (ret) 1348 unlock_comm(cinfo); 1349 else { 1350 dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR); 1351 /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which 1352 * will run soon after add_new_disk, the below path will be 1353 * invoked: 1354 * md_wakeup_thread(mddev->thread) 1355 * -> conf->thread (raid1d) 1356 * -> md_check_recovery -> md_update_sb 1357 * -> metadata_update_start/finish 1358 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually. 1359 * 1360 * For other failure cases, metadata_update_cancel and 1361 * add_new_disk_cancel also clear below bit as well. 1362 * */ 1363 set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1364 wake_up(&cinfo->wait); 1365 } 1366 return ret; 1367 } 1368 1369 static void add_new_disk_cancel(struct mddev *mddev) 1370 { 1371 struct md_cluster_info *cinfo = mddev->cluster_info; 1372 clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state); 1373 unlock_comm(cinfo); 1374 } 1375 1376 static int new_disk_ack(struct mddev *mddev, bool ack) 1377 { 1378 struct md_cluster_info *cinfo = mddev->cluster_info; 1379 1380 if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) { 1381 pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev)); 1382 return -EINVAL; 1383 } 1384 1385 if (ack) 1386 dlm_unlock_sync(cinfo->no_new_dev_lockres); 1387 complete(&cinfo->newdisk_completion); 1388 return 0; 1389 } 1390 1391 static int remove_disk(struct mddev *mddev, struct md_rdev *rdev) 1392 { 1393 struct cluster_msg cmsg = {0}; 1394 struct md_cluster_info *cinfo = mddev->cluster_info; 1395 cmsg.type = cpu_to_le32(REMOVE); 1396 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1397 return sendmsg(cinfo, &cmsg, 1); 1398 } 1399 1400 static int lock_all_bitmaps(struct mddev *mddev) 1401 { 1402 int slot, my_slot, ret, held = 1, i = 0; 1403 char str[64]; 1404 struct md_cluster_info *cinfo = mddev->cluster_info; 1405 1406 cinfo->other_bitmap_lockres = 1407 kcalloc(mddev->bitmap_info.nodes - 1, 1408 sizeof(struct dlm_lock_resource *), GFP_KERNEL); 1409 if (!cinfo->other_bitmap_lockres) { 1410 pr_err("md: can't alloc mem for other bitmap locks\n"); 1411 return 0; 1412 } 1413 1414 my_slot = slot_number(mddev); 1415 for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) { 1416 if (slot == my_slot) 1417 continue; 1418 1419 memset(str, '\0', 64); 1420 snprintf(str, 64, "bitmap%04d", slot); 1421 cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1); 1422 if (!cinfo->other_bitmap_lockres[i]) 1423 return -ENOMEM; 1424 1425 cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE; 1426 ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW); 1427 if (ret) 1428 held = -1; 1429 i++; 1430 } 1431 1432 return held; 1433 } 1434 1435 static void unlock_all_bitmaps(struct mddev *mddev) 1436 { 1437 struct md_cluster_info *cinfo = mddev->cluster_info; 1438 int i; 1439 1440 /* release other node's bitmap lock if they are existed */ 1441 if (cinfo->other_bitmap_lockres) { 1442 for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) { 1443 if (cinfo->other_bitmap_lockres[i]) { 1444 lockres_free(cinfo->other_bitmap_lockres[i]); 1445 } 1446 } 1447 kfree(cinfo->other_bitmap_lockres); 1448 } 1449 } 1450 1451 static int gather_bitmaps(struct md_rdev *rdev) 1452 { 1453 int sn, err; 1454 sector_t lo, hi; 1455 struct cluster_msg cmsg = {0}; 1456 struct mddev *mddev = rdev->mddev; 1457 struct md_cluster_info *cinfo = mddev->cluster_info; 1458 1459 cmsg.type = cpu_to_le32(RE_ADD); 1460 cmsg.raid_slot = cpu_to_le32(rdev->desc_nr); 1461 err = sendmsg(cinfo, &cmsg, 1); 1462 if (err) 1463 goto out; 1464 1465 for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) { 1466 if (sn == (cinfo->slot_number - 1)) 1467 continue; 1468 err = bitmap_copy_from_slot(mddev, sn, &lo, &hi, false); 1469 if (err) { 1470 pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn); 1471 goto out; 1472 } 1473 if ((hi > 0) && (lo < mddev->recovery_cp)) 1474 mddev->recovery_cp = lo; 1475 } 1476 out: 1477 return err; 1478 } 1479 1480 static struct md_cluster_operations cluster_ops = { 1481 .join = join, 1482 .leave = leave, 1483 .slot_number = slot_number, 1484 .resync_start = resync_start, 1485 .resync_finish = resync_finish, 1486 .resync_info_update = resync_info_update, 1487 .metadata_update_start = metadata_update_start, 1488 .metadata_update_finish = metadata_update_finish, 1489 .metadata_update_cancel = metadata_update_cancel, 1490 .area_resyncing = area_resyncing, 1491 .add_new_disk = add_new_disk, 1492 .add_new_disk_cancel = add_new_disk_cancel, 1493 .new_disk_ack = new_disk_ack, 1494 .remove_disk = remove_disk, 1495 .load_bitmaps = load_bitmaps, 1496 .gather_bitmaps = gather_bitmaps, 1497 .lock_all_bitmaps = lock_all_bitmaps, 1498 .unlock_all_bitmaps = unlock_all_bitmaps, 1499 .update_size = update_size, 1500 }; 1501 1502 static int __init cluster_init(void) 1503 { 1504 pr_warn("md-cluster: support raid1 and raid10 (limited support)\n"); 1505 pr_info("Registering Cluster MD functions\n"); 1506 register_md_cluster_operations(&cluster_ops, THIS_MODULE); 1507 return 0; 1508 } 1509 1510 static void cluster_exit(void) 1511 { 1512 unregister_md_cluster_operations(); 1513 } 1514 1515 module_init(cluster_init); 1516 module_exit(cluster_exit); 1517 MODULE_AUTHOR("SUSE"); 1518 MODULE_LICENSE("GPL"); 1519 MODULE_DESCRIPTION("Clustering support for MD"); 1520