1 /* -*- mode: c; c-basic-offset: 8; -*- 2 * vim: noexpandtab sw=8 ts=8 sts=0: 3 * 4 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 5 * 6 * This program is free software; you can redistribute it and/or 7 * modify it under the terms of the GNU General Public 8 * License as published by the Free Software Foundation; either 9 * version 2 of the License, or (at your option) any later version. 10 * 11 * This program is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 14 * General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public 17 * License along with this program; if not, write to the 18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330, 19 * Boston, MA 021110-1307, USA. 20 */ 21 22 #include <linux/kernel.h> 23 #include <linux/sched.h> 24 #include <linux/jiffies.h> 25 #include <linux/module.h> 26 #include <linux/fs.h> 27 #include <linux/bio.h> 28 #include <linux/blkdev.h> 29 #include <linux/delay.h> 30 #include <linux/file.h> 31 #include <linux/kthread.h> 32 #include <linux/configfs.h> 33 #include <linux/random.h> 34 #include <linux/crc32.h> 35 #include <linux/time.h> 36 37 #include "heartbeat.h" 38 #include "tcp.h" 39 #include "nodemanager.h" 40 #include "quorum.h" 41 42 #include "masklog.h" 43 44 45 /* 46 * The first heartbeat pass had one global thread that would serialize all hb 47 * callback calls. This global serializing sem should only be removed once 48 * we've made sure that all callees can deal with being called concurrently 49 * from multiple hb region threads. 50 */ 51 static DECLARE_RWSEM(o2hb_callback_sem); 52 53 /* 54 * multiple hb threads are watching multiple regions. A node is live 55 * whenever any of the threads sees activity from the node in its region. 56 */ 57 static spinlock_t o2hb_live_lock = SPIN_LOCK_UNLOCKED; 58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 60 static LIST_HEAD(o2hb_node_events); 61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 62 63 static LIST_HEAD(o2hb_all_regions); 64 65 static struct o2hb_callback { 66 struct list_head list; 67 } o2hb_callbacks[O2HB_NUM_CB]; 68 69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 70 71 #define O2HB_DEFAULT_BLOCK_BITS 9 72 73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 74 75 /* Only sets a new threshold if there are no active regions. 76 * 77 * No locking or otherwise interesting code is required for reading 78 * o2hb_dead_threshold as it can't change once regions are active and 79 * it's not interesting to anyone until then anyway. */ 80 static void o2hb_dead_threshold_set(unsigned int threshold) 81 { 82 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 83 spin_lock(&o2hb_live_lock); 84 if (list_empty(&o2hb_all_regions)) 85 o2hb_dead_threshold = threshold; 86 spin_unlock(&o2hb_live_lock); 87 } 88 } 89 90 struct o2hb_node_event { 91 struct list_head hn_item; 92 enum o2hb_callback_type hn_event_type; 93 struct o2nm_node *hn_node; 94 int hn_node_num; 95 }; 96 97 struct o2hb_disk_slot { 98 struct o2hb_disk_heartbeat_block *ds_raw_block; 99 u8 ds_node_num; 100 u64 ds_last_time; 101 u64 ds_last_generation; 102 u16 ds_equal_samples; 103 u16 ds_changed_samples; 104 struct list_head ds_live_item; 105 }; 106 107 /* each thread owns a region.. when we're asked to tear down the region 108 * we ask the thread to stop, who cleans up the region */ 109 struct o2hb_region { 110 struct config_item hr_item; 111 112 struct list_head hr_all_item; 113 unsigned hr_unclean_stop:1; 114 115 /* protected by the hr_callback_sem */ 116 struct task_struct *hr_task; 117 118 unsigned int hr_blocks; 119 unsigned long long hr_start_block; 120 121 unsigned int hr_block_bits; 122 unsigned int hr_block_bytes; 123 124 unsigned int hr_slots_per_page; 125 unsigned int hr_num_pages; 126 127 struct page **hr_slot_data; 128 struct block_device *hr_bdev; 129 struct o2hb_disk_slot *hr_slots; 130 131 /* let the person setting up hb wait for it to return until it 132 * has reached a 'steady' state. This will be fixed when we have 133 * a more complete api that doesn't lead to this sort of fragility. */ 134 atomic_t hr_steady_iterations; 135 136 char hr_dev_name[BDEVNAME_SIZE]; 137 138 unsigned int hr_timeout_ms; 139 140 /* randomized as the region goes up and down so that a node 141 * recognizes a node going up and down in one iteration */ 142 u64 hr_generation; 143 144 struct work_struct hr_write_timeout_work; 145 unsigned long hr_last_timeout_start; 146 147 /* Used during o2hb_check_slot to hold a copy of the block 148 * being checked because we temporarily have to zero out the 149 * crc field. */ 150 struct o2hb_disk_heartbeat_block *hr_tmp_block; 151 }; 152 153 struct o2hb_bio_wait_ctxt { 154 atomic_t wc_num_reqs; 155 struct completion wc_io_complete; 156 }; 157 158 static void o2hb_write_timeout(void *arg) 159 { 160 struct o2hb_region *reg = arg; 161 162 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 163 "milliseconds\n", reg->hr_dev_name, 164 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 165 o2quo_disk_timeout(); 166 } 167 168 static void o2hb_arm_write_timeout(struct o2hb_region *reg) 169 { 170 mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS); 171 172 cancel_delayed_work(®->hr_write_timeout_work); 173 reg->hr_last_timeout_start = jiffies; 174 schedule_delayed_work(®->hr_write_timeout_work, 175 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 176 } 177 178 static void o2hb_disarm_write_timeout(struct o2hb_region *reg) 179 { 180 cancel_delayed_work(®->hr_write_timeout_work); 181 flush_scheduled_work(); 182 } 183 184 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc, 185 unsigned int num_ios) 186 { 187 atomic_set(&wc->wc_num_reqs, num_ios); 188 init_completion(&wc->wc_io_complete); 189 } 190 191 /* Used in error paths too */ 192 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 193 unsigned int num) 194 { 195 /* sadly atomic_sub_and_test() isn't available on all platforms. The 196 * good news is that the fast path only completes one at a time */ 197 while(num--) { 198 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 199 BUG_ON(num > 0); 200 complete(&wc->wc_io_complete); 201 } 202 } 203 } 204 205 static void o2hb_wait_on_io(struct o2hb_region *reg, 206 struct o2hb_bio_wait_ctxt *wc) 207 { 208 struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping; 209 210 blk_run_address_space(mapping); 211 212 wait_for_completion(&wc->wc_io_complete); 213 } 214 215 static int o2hb_bio_end_io(struct bio *bio, 216 unsigned int bytes_done, 217 int error) 218 { 219 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 220 221 if (error) 222 mlog(ML_ERROR, "IO Error %d\n", error); 223 224 if (bio->bi_size) 225 return 1; 226 227 o2hb_bio_wait_dec(wc, 1); 228 return 0; 229 } 230 231 /* Setup a Bio to cover I/O against num_slots slots starting at 232 * start_slot. */ 233 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 234 struct o2hb_bio_wait_ctxt *wc, 235 unsigned int start_slot, 236 unsigned int num_slots) 237 { 238 int i, nr_vecs, len, first_page, last_page; 239 unsigned int vec_len, vec_start; 240 unsigned int bits = reg->hr_block_bits; 241 unsigned int spp = reg->hr_slots_per_page; 242 struct bio *bio; 243 struct page *page; 244 245 nr_vecs = (num_slots + spp - 1) / spp; 246 247 /* Testing has shown this allocation to take long enough under 248 * GFP_KERNEL that the local node can get fenced. It would be 249 * nicest if we could pre-allocate these bios and avoid this 250 * all together. */ 251 bio = bio_alloc(GFP_ATOMIC, nr_vecs); 252 if (!bio) { 253 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 254 bio = ERR_PTR(-ENOMEM); 255 goto bail; 256 } 257 258 /* Must put everything in 512 byte sectors for the bio... */ 259 bio->bi_sector = (reg->hr_start_block + start_slot) << (bits - 9); 260 bio->bi_bdev = reg->hr_bdev; 261 bio->bi_private = wc; 262 bio->bi_end_io = o2hb_bio_end_io; 263 264 first_page = start_slot / spp; 265 last_page = first_page + nr_vecs; 266 vec_start = (start_slot << bits) % PAGE_CACHE_SIZE; 267 for(i = first_page; i < last_page; i++) { 268 page = reg->hr_slot_data[i]; 269 270 vec_len = PAGE_CACHE_SIZE; 271 /* last page might be short */ 272 if (((i + 1) * spp) > (start_slot + num_slots)) 273 vec_len = ((num_slots + start_slot) % spp) << bits; 274 vec_len -= vec_start; 275 276 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 277 i, vec_len, vec_start); 278 279 len = bio_add_page(bio, page, vec_len, vec_start); 280 if (len != vec_len) { 281 bio_put(bio); 282 bio = ERR_PTR(-EIO); 283 284 mlog(ML_ERROR, "Error adding page to bio i = %d, " 285 "vec_len = %u, len = %d\n, start = %u\n", 286 i, vec_len, len, vec_start); 287 goto bail; 288 } 289 290 vec_start = 0; 291 } 292 293 bail: 294 return bio; 295 } 296 297 /* 298 * Compute the maximum number of sectors the bdev can handle in one bio, 299 * as a power of two. 300 * 301 * Stolen from oracleasm, thanks Joel! 302 */ 303 static int compute_max_sectors(struct block_device *bdev) 304 { 305 int max_pages, max_sectors, pow_two_sectors; 306 307 struct request_queue *q; 308 309 q = bdev_get_queue(bdev); 310 max_pages = q->max_sectors >> (PAGE_SHIFT - 9); 311 if (max_pages > BIO_MAX_PAGES) 312 max_pages = BIO_MAX_PAGES; 313 if (max_pages > q->max_phys_segments) 314 max_pages = q->max_phys_segments; 315 if (max_pages > q->max_hw_segments) 316 max_pages = q->max_hw_segments; 317 max_pages--; /* Handle I/Os that straddle a page */ 318 319 max_sectors = max_pages << (PAGE_SHIFT - 9); 320 321 /* Why is fls() 1-based???? */ 322 pow_two_sectors = 1 << (fls(max_sectors) - 1); 323 324 return pow_two_sectors; 325 } 326 327 static inline void o2hb_compute_request_limits(struct o2hb_region *reg, 328 unsigned int num_slots, 329 unsigned int *num_bios, 330 unsigned int *slots_per_bio) 331 { 332 unsigned int max_sectors, io_sectors; 333 334 max_sectors = compute_max_sectors(reg->hr_bdev); 335 336 io_sectors = num_slots << (reg->hr_block_bits - 9); 337 338 *num_bios = (io_sectors + max_sectors - 1) / max_sectors; 339 *slots_per_bio = max_sectors >> (reg->hr_block_bits - 9); 340 341 mlog(ML_HB_BIO, "My io size is %u sectors for %u slots. This " 342 "device can handle %u sectors of I/O\n", io_sectors, num_slots, 343 max_sectors); 344 mlog(ML_HB_BIO, "Will need %u bios holding %u slots each\n", 345 *num_bios, *slots_per_bio); 346 } 347 348 static int o2hb_read_slots(struct o2hb_region *reg, 349 unsigned int max_slots) 350 { 351 unsigned int num_bios, slots_per_bio, start_slot, num_slots; 352 int i, status; 353 struct o2hb_bio_wait_ctxt wc; 354 struct bio **bios; 355 struct bio *bio; 356 357 o2hb_compute_request_limits(reg, max_slots, &num_bios, &slots_per_bio); 358 359 bios = kcalloc(num_bios, sizeof(struct bio *), GFP_KERNEL); 360 if (!bios) { 361 status = -ENOMEM; 362 mlog_errno(status); 363 return status; 364 } 365 366 o2hb_bio_wait_init(&wc, num_bios); 367 368 num_slots = slots_per_bio; 369 for(i = 0; i < num_bios; i++) { 370 start_slot = i * slots_per_bio; 371 372 /* adjust num_slots at last bio */ 373 if (max_slots < (start_slot + num_slots)) 374 num_slots = max_slots - start_slot; 375 376 bio = o2hb_setup_one_bio(reg, &wc, start_slot, num_slots); 377 if (IS_ERR(bio)) { 378 o2hb_bio_wait_dec(&wc, num_bios - i); 379 380 status = PTR_ERR(bio); 381 mlog_errno(status); 382 goto bail_and_wait; 383 } 384 bios[i] = bio; 385 386 submit_bio(READ, bio); 387 } 388 389 status = 0; 390 391 bail_and_wait: 392 o2hb_wait_on_io(reg, &wc); 393 394 if (bios) { 395 for(i = 0; i < num_bios; i++) 396 if (bios[i]) 397 bio_put(bios[i]); 398 kfree(bios); 399 } 400 401 return status; 402 } 403 404 static int o2hb_issue_node_write(struct o2hb_region *reg, 405 struct bio **write_bio, 406 struct o2hb_bio_wait_ctxt *write_wc) 407 { 408 int status; 409 unsigned int slot; 410 struct bio *bio; 411 412 o2hb_bio_wait_init(write_wc, 1); 413 414 slot = o2nm_this_node(); 415 416 bio = o2hb_setup_one_bio(reg, write_wc, slot, 1); 417 if (IS_ERR(bio)) { 418 status = PTR_ERR(bio); 419 mlog_errno(status); 420 goto bail; 421 } 422 423 submit_bio(WRITE, bio); 424 425 *write_bio = bio; 426 status = 0; 427 bail: 428 return status; 429 } 430 431 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 432 struct o2hb_disk_heartbeat_block *hb_block) 433 { 434 __le32 old_cksum; 435 u32 ret; 436 437 /* We want to compute the block crc with a 0 value in the 438 * hb_cksum field. Save it off here and replace after the 439 * crc. */ 440 old_cksum = hb_block->hb_cksum; 441 hb_block->hb_cksum = 0; 442 443 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 444 445 hb_block->hb_cksum = old_cksum; 446 447 return ret; 448 } 449 450 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 451 { 452 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 453 "cksum = 0x%x, generation 0x%llx\n", 454 (long long)le64_to_cpu(hb_block->hb_seq), 455 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 456 (long long)le64_to_cpu(hb_block->hb_generation)); 457 } 458 459 static int o2hb_verify_crc(struct o2hb_region *reg, 460 struct o2hb_disk_heartbeat_block *hb_block) 461 { 462 u32 read, computed; 463 464 read = le32_to_cpu(hb_block->hb_cksum); 465 computed = o2hb_compute_block_crc_le(reg, hb_block); 466 467 return read == computed; 468 } 469 470 /* We want to make sure that nobody is heartbeating on top of us -- 471 * this will help detect an invalid configuration. */ 472 static int o2hb_check_last_timestamp(struct o2hb_region *reg) 473 { 474 int node_num, ret; 475 struct o2hb_disk_slot *slot; 476 struct o2hb_disk_heartbeat_block *hb_block; 477 478 node_num = o2nm_this_node(); 479 480 ret = 1; 481 slot = ®->hr_slots[node_num]; 482 /* Don't check on our 1st timestamp */ 483 if (slot->ds_last_time) { 484 hb_block = slot->ds_raw_block; 485 486 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time) 487 ret = 0; 488 } 489 490 return ret; 491 } 492 493 static inline void o2hb_prepare_block(struct o2hb_region *reg, 494 u64 generation) 495 { 496 int node_num; 497 u64 cputime; 498 struct o2hb_disk_slot *slot; 499 struct o2hb_disk_heartbeat_block *hb_block; 500 501 node_num = o2nm_this_node(); 502 slot = ®->hr_slots[node_num]; 503 504 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 505 memset(hb_block, 0, reg->hr_block_bytes); 506 /* TODO: time stuff */ 507 cputime = CURRENT_TIME.tv_sec; 508 if (!cputime) 509 cputime = 1; 510 511 hb_block->hb_seq = cpu_to_le64(cputime); 512 hb_block->hb_node = node_num; 513 hb_block->hb_generation = cpu_to_le64(generation); 514 515 /* This step must always happen last! */ 516 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 517 hb_block)); 518 519 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 520 (long long)cpu_to_le64(generation), 521 le32_to_cpu(hb_block->hb_cksum)); 522 } 523 524 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 525 struct o2nm_node *node, 526 int idx) 527 { 528 struct list_head *iter; 529 struct o2hb_callback_func *f; 530 531 list_for_each(iter, &hbcall->list) { 532 f = list_entry(iter, struct o2hb_callback_func, hc_item); 533 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 534 (f->hc_func)(node, idx, f->hc_data); 535 } 536 } 537 538 /* Will run the list in order until we process the passed event */ 539 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 540 { 541 int empty; 542 struct o2hb_callback *hbcall; 543 struct o2hb_node_event *event; 544 545 spin_lock(&o2hb_live_lock); 546 empty = list_empty(&queued_event->hn_item); 547 spin_unlock(&o2hb_live_lock); 548 if (empty) 549 return; 550 551 /* Holding callback sem assures we don't alter the callback 552 * lists when doing this, and serializes ourselves with other 553 * processes wanting callbacks. */ 554 down_write(&o2hb_callback_sem); 555 556 spin_lock(&o2hb_live_lock); 557 while (!list_empty(&o2hb_node_events) 558 && !list_empty(&queued_event->hn_item)) { 559 event = list_entry(o2hb_node_events.next, 560 struct o2hb_node_event, 561 hn_item); 562 list_del_init(&event->hn_item); 563 spin_unlock(&o2hb_live_lock); 564 565 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 566 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 567 event->hn_node_num); 568 569 hbcall = hbcall_from_type(event->hn_event_type); 570 571 /* We should *never* have gotten on to the list with a 572 * bad type... This isn't something that we should try 573 * to recover from. */ 574 BUG_ON(IS_ERR(hbcall)); 575 576 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 577 578 spin_lock(&o2hb_live_lock); 579 } 580 spin_unlock(&o2hb_live_lock); 581 582 up_write(&o2hb_callback_sem); 583 } 584 585 static void o2hb_queue_node_event(struct o2hb_node_event *event, 586 enum o2hb_callback_type type, 587 struct o2nm_node *node, 588 int node_num) 589 { 590 assert_spin_locked(&o2hb_live_lock); 591 592 event->hn_event_type = type; 593 event->hn_node = node; 594 event->hn_node_num = node_num; 595 596 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 597 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 598 599 list_add_tail(&event->hn_item, &o2hb_node_events); 600 } 601 602 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 603 { 604 struct o2hb_node_event event = 605 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 606 struct o2nm_node *node; 607 608 node = o2nm_get_node_by_num(slot->ds_node_num); 609 if (!node) 610 return; 611 612 spin_lock(&o2hb_live_lock); 613 if (!list_empty(&slot->ds_live_item)) { 614 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 615 slot->ds_node_num); 616 617 list_del_init(&slot->ds_live_item); 618 619 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 620 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 621 622 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 623 slot->ds_node_num); 624 } 625 } 626 spin_unlock(&o2hb_live_lock); 627 628 o2hb_run_event_list(&event); 629 630 o2nm_node_put(node); 631 } 632 633 static int o2hb_check_slot(struct o2hb_region *reg, 634 struct o2hb_disk_slot *slot) 635 { 636 int changed = 0, gen_changed = 0; 637 struct o2hb_node_event event = 638 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 639 struct o2nm_node *node; 640 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 641 u64 cputime; 642 643 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 644 645 /* Is this correct? Do we assume that the node doesn't exist 646 * if we're not configured for him? */ 647 node = o2nm_get_node_by_num(slot->ds_node_num); 648 if (!node) 649 return 0; 650 651 if (!o2hb_verify_crc(reg, hb_block)) { 652 /* all paths from here will drop o2hb_live_lock for 653 * us. */ 654 spin_lock(&o2hb_live_lock); 655 656 /* Don't print an error on the console in this case - 657 * a freshly formatted heartbeat area will not have a 658 * crc set on it. */ 659 if (list_empty(&slot->ds_live_item)) 660 goto out; 661 662 /* The node is live but pushed out a bad crc. We 663 * consider it a transient miss but don't populate any 664 * other values as they may be junk. */ 665 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 666 slot->ds_node_num, reg->hr_dev_name); 667 o2hb_dump_slot(hb_block); 668 669 slot->ds_equal_samples++; 670 goto fire_callbacks; 671 } 672 673 /* we don't care if these wrap.. the state transitions below 674 * clear at the right places */ 675 cputime = le64_to_cpu(hb_block->hb_seq); 676 if (slot->ds_last_time != cputime) 677 slot->ds_changed_samples++; 678 else 679 slot->ds_equal_samples++; 680 slot->ds_last_time = cputime; 681 682 /* The node changed heartbeat generations. We assume this to 683 * mean it dropped off but came back before we timed out. We 684 * want to consider it down for the time being but don't want 685 * to lose any changed_samples state we might build up to 686 * considering it live again. */ 687 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 688 gen_changed = 1; 689 slot->ds_equal_samples = 0; 690 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 691 "to 0x%llx)\n", slot->ds_node_num, 692 (long long)slot->ds_last_generation, 693 (long long)le64_to_cpu(hb_block->hb_generation)); 694 } 695 696 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 697 698 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 699 "seq %llu last %llu changed %u equal %u\n", 700 slot->ds_node_num, (long long)slot->ds_last_generation, 701 le32_to_cpu(hb_block->hb_cksum), 702 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 703 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 704 slot->ds_equal_samples); 705 706 spin_lock(&o2hb_live_lock); 707 708 fire_callbacks: 709 /* dead nodes only come to life after some number of 710 * changes at any time during their dead time */ 711 if (list_empty(&slot->ds_live_item) && 712 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 713 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 714 slot->ds_node_num, (long long)slot->ds_last_generation); 715 716 /* first on the list generates a callback */ 717 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 718 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 719 720 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 721 slot->ds_node_num); 722 723 changed = 1; 724 } 725 726 list_add_tail(&slot->ds_live_item, 727 &o2hb_live_slots[slot->ds_node_num]); 728 729 slot->ds_equal_samples = 0; 730 goto out; 731 } 732 733 /* if the list is dead, we're done.. */ 734 if (list_empty(&slot->ds_live_item)) 735 goto out; 736 737 /* live nodes only go dead after enough consequtive missed 738 * samples.. reset the missed counter whenever we see 739 * activity */ 740 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 741 mlog(ML_HEARTBEAT, "Node %d left my region\n", 742 slot->ds_node_num); 743 744 /* last off the live_slot generates a callback */ 745 list_del_init(&slot->ds_live_item); 746 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 747 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 748 749 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 750 slot->ds_node_num); 751 752 changed = 1; 753 } 754 755 /* We don't clear this because the node is still 756 * actually writing new blocks. */ 757 if (!gen_changed) 758 slot->ds_changed_samples = 0; 759 goto out; 760 } 761 if (slot->ds_changed_samples) { 762 slot->ds_changed_samples = 0; 763 slot->ds_equal_samples = 0; 764 } 765 out: 766 spin_unlock(&o2hb_live_lock); 767 768 o2hb_run_event_list(&event); 769 770 o2nm_node_put(node); 771 return changed; 772 } 773 774 /* This could be faster if we just implmented a find_last_bit, but I 775 * don't think the circumstances warrant it. */ 776 static int o2hb_highest_node(unsigned long *nodes, 777 int numbits) 778 { 779 int highest, node; 780 781 highest = numbits; 782 node = -1; 783 while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) { 784 if (node >= numbits) 785 break; 786 787 highest = node; 788 } 789 790 return highest; 791 } 792 793 static void o2hb_do_disk_heartbeat(struct o2hb_region *reg) 794 { 795 int i, ret, highest_node, change = 0; 796 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 797 struct bio *write_bio; 798 struct o2hb_bio_wait_ctxt write_wc; 799 800 if (o2nm_configured_node_map(configured_nodes, sizeof(configured_nodes))) 801 return; 802 803 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 804 if (highest_node >= O2NM_MAX_NODES) { 805 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n"); 806 return; 807 } 808 809 /* No sense in reading the slots of nodes that don't exist 810 * yet. Of course, if the node definitions have holes in them 811 * then we're reading an empty slot anyway... Consider this 812 * best-effort. */ 813 ret = o2hb_read_slots(reg, highest_node + 1); 814 if (ret < 0) { 815 mlog_errno(ret); 816 return; 817 } 818 819 /* With an up to date view of the slots, we can check that no 820 * other node has been improperly configured to heartbeat in 821 * our slot. */ 822 if (!o2hb_check_last_timestamp(reg)) 823 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating " 824 "in our slot!\n", reg->hr_dev_name); 825 826 /* fill in the proper info for our next heartbeat */ 827 o2hb_prepare_block(reg, reg->hr_generation); 828 829 /* And fire off the write. Note that we don't wait on this I/O 830 * until later. */ 831 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 832 if (ret < 0) { 833 mlog_errno(ret); 834 return; 835 } 836 837 i = -1; 838 while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 839 840 change |= o2hb_check_slot(reg, ®->hr_slots[i]); 841 } 842 843 /* 844 * We have to be sure we've advertised ourselves on disk 845 * before we can go to steady state. This ensures that 846 * people we find in our steady state have seen us. 847 */ 848 o2hb_wait_on_io(reg, &write_wc); 849 bio_put(write_bio); 850 o2hb_arm_write_timeout(reg); 851 852 /* let the person who launched us know when things are steady */ 853 if (!change && (atomic_read(®->hr_steady_iterations) != 0)) { 854 if (atomic_dec_and_test(®->hr_steady_iterations)) 855 wake_up(&o2hb_steady_queue); 856 } 857 } 858 859 /* Subtract b from a, storing the result in a. a *must* have a larger 860 * value than b. */ 861 static void o2hb_tv_subtract(struct timeval *a, 862 struct timeval *b) 863 { 864 /* just return 0 when a is after b */ 865 if (a->tv_sec < b->tv_sec || 866 (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) { 867 a->tv_sec = 0; 868 a->tv_usec = 0; 869 return; 870 } 871 872 a->tv_sec -= b->tv_sec; 873 a->tv_usec -= b->tv_usec; 874 while ( a->tv_usec < 0 ) { 875 a->tv_sec--; 876 a->tv_usec += 1000000; 877 } 878 } 879 880 static unsigned int o2hb_elapsed_msecs(struct timeval *start, 881 struct timeval *end) 882 { 883 struct timeval res = *end; 884 885 o2hb_tv_subtract(&res, start); 886 887 return res.tv_sec * 1000 + res.tv_usec / 1000; 888 } 889 890 /* 891 * we ride the region ref that the region dir holds. before the region 892 * dir is removed and drops it ref it will wait to tear down this 893 * thread. 894 */ 895 static int o2hb_thread(void *data) 896 { 897 int i, ret; 898 struct o2hb_region *reg = data; 899 struct bio *write_bio; 900 struct o2hb_bio_wait_ctxt write_wc; 901 struct timeval before_hb, after_hb; 902 unsigned int elapsed_msec; 903 904 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 905 906 set_user_nice(current, -20); 907 908 while (!kthread_should_stop() && !reg->hr_unclean_stop) { 909 /* We track the time spent inside 910 * o2hb_do_disk_heartbeat so that we avoid more then 911 * hr_timeout_ms between disk writes. On busy systems 912 * this should result in a heartbeat which is less 913 * likely to time itself out. */ 914 do_gettimeofday(&before_hb); 915 916 o2hb_do_disk_heartbeat(reg); 917 918 do_gettimeofday(&after_hb); 919 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb); 920 921 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n", 922 before_hb.tv_sec, (unsigned long) before_hb.tv_usec, 923 after_hb.tv_sec, (unsigned long) after_hb.tv_usec, 924 elapsed_msec); 925 926 if (elapsed_msec < reg->hr_timeout_ms) { 927 /* the kthread api has blocked signals for us so no 928 * need to record the return value. */ 929 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 930 } 931 } 932 933 o2hb_disarm_write_timeout(reg); 934 935 /* unclean stop is only used in very bad situation */ 936 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 937 o2hb_shutdown_slot(®->hr_slots[i]); 938 939 /* Explicit down notification - avoid forcing the other nodes 940 * to timeout on this region when we could just as easily 941 * write a clear generation - thus indicating to them that 942 * this node has left this region. 943 * 944 * XXX: Should we skip this on unclean_stop? */ 945 o2hb_prepare_block(reg, 0); 946 ret = o2hb_issue_node_write(reg, &write_bio, &write_wc); 947 if (ret == 0) { 948 o2hb_wait_on_io(reg, &write_wc); 949 bio_put(write_bio); 950 } else { 951 mlog_errno(ret); 952 } 953 954 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n"); 955 956 return 0; 957 } 958 959 void o2hb_init(void) 960 { 961 int i; 962 963 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 964 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 965 966 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 967 INIT_LIST_HEAD(&o2hb_live_slots[i]); 968 969 INIT_LIST_HEAD(&o2hb_node_events); 970 971 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 972 } 973 974 /* if we're already in a callback then we're already serialized by the sem */ 975 static void o2hb_fill_node_map_from_callback(unsigned long *map, 976 unsigned bytes) 977 { 978 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 979 980 memcpy(map, &o2hb_live_node_bitmap, bytes); 981 } 982 983 /* 984 * get a map of all nodes that are heartbeating in any regions 985 */ 986 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 987 { 988 /* callers want to serialize this map and callbacks so that they 989 * can trust that they don't miss nodes coming to the party */ 990 down_read(&o2hb_callback_sem); 991 spin_lock(&o2hb_live_lock); 992 o2hb_fill_node_map_from_callback(map, bytes); 993 spin_unlock(&o2hb_live_lock); 994 up_read(&o2hb_callback_sem); 995 } 996 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 997 998 /* 999 * heartbeat configfs bits. The heartbeat set is a default set under 1000 * the cluster set in nodemanager.c. 1001 */ 1002 1003 static struct o2hb_region *to_o2hb_region(struct config_item *item) 1004 { 1005 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 1006 } 1007 1008 /* drop_item only drops its ref after killing the thread, nothing should 1009 * be using the region anymore. this has to clean up any state that 1010 * attributes might have built up. */ 1011 static void o2hb_region_release(struct config_item *item) 1012 { 1013 int i; 1014 struct page *page; 1015 struct o2hb_region *reg = to_o2hb_region(item); 1016 1017 if (reg->hr_tmp_block) 1018 kfree(reg->hr_tmp_block); 1019 1020 if (reg->hr_slot_data) { 1021 for (i = 0; i < reg->hr_num_pages; i++) { 1022 page = reg->hr_slot_data[i]; 1023 if (page) 1024 __free_page(page); 1025 } 1026 kfree(reg->hr_slot_data); 1027 } 1028 1029 if (reg->hr_bdev) 1030 blkdev_put(reg->hr_bdev); 1031 1032 if (reg->hr_slots) 1033 kfree(reg->hr_slots); 1034 1035 spin_lock(&o2hb_live_lock); 1036 list_del(®->hr_all_item); 1037 spin_unlock(&o2hb_live_lock); 1038 1039 kfree(reg); 1040 } 1041 1042 static int o2hb_read_block_input(struct o2hb_region *reg, 1043 const char *page, 1044 size_t count, 1045 unsigned long *ret_bytes, 1046 unsigned int *ret_bits) 1047 { 1048 unsigned long bytes; 1049 char *p = (char *)page; 1050 1051 bytes = simple_strtoul(p, &p, 0); 1052 if (!p || (*p && (*p != '\n'))) 1053 return -EINVAL; 1054 1055 /* Heartbeat and fs min / max block sizes are the same. */ 1056 if (bytes > 4096 || bytes < 512) 1057 return -ERANGE; 1058 if (hweight16(bytes) != 1) 1059 return -EINVAL; 1060 1061 if (ret_bytes) 1062 *ret_bytes = bytes; 1063 if (ret_bits) 1064 *ret_bits = ffs(bytes) - 1; 1065 1066 return 0; 1067 } 1068 1069 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg, 1070 char *page) 1071 { 1072 return sprintf(page, "%u\n", reg->hr_block_bytes); 1073 } 1074 1075 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg, 1076 const char *page, 1077 size_t count) 1078 { 1079 int status; 1080 unsigned long block_bytes; 1081 unsigned int block_bits; 1082 1083 if (reg->hr_bdev) 1084 return -EINVAL; 1085 1086 status = o2hb_read_block_input(reg, page, count, 1087 &block_bytes, &block_bits); 1088 if (status) 1089 return status; 1090 1091 reg->hr_block_bytes = (unsigned int)block_bytes; 1092 reg->hr_block_bits = block_bits; 1093 1094 return count; 1095 } 1096 1097 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg, 1098 char *page) 1099 { 1100 return sprintf(page, "%llu\n", reg->hr_start_block); 1101 } 1102 1103 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg, 1104 const char *page, 1105 size_t count) 1106 { 1107 unsigned long long tmp; 1108 char *p = (char *)page; 1109 1110 if (reg->hr_bdev) 1111 return -EINVAL; 1112 1113 tmp = simple_strtoull(p, &p, 0); 1114 if (!p || (*p && (*p != '\n'))) 1115 return -EINVAL; 1116 1117 reg->hr_start_block = tmp; 1118 1119 return count; 1120 } 1121 1122 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg, 1123 char *page) 1124 { 1125 return sprintf(page, "%d\n", reg->hr_blocks); 1126 } 1127 1128 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg, 1129 const char *page, 1130 size_t count) 1131 { 1132 unsigned long tmp; 1133 char *p = (char *)page; 1134 1135 if (reg->hr_bdev) 1136 return -EINVAL; 1137 1138 tmp = simple_strtoul(p, &p, 0); 1139 if (!p || (*p && (*p != '\n'))) 1140 return -EINVAL; 1141 1142 if (tmp > O2NM_MAX_NODES || tmp == 0) 1143 return -ERANGE; 1144 1145 reg->hr_blocks = (unsigned int)tmp; 1146 1147 return count; 1148 } 1149 1150 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg, 1151 char *page) 1152 { 1153 unsigned int ret = 0; 1154 1155 if (reg->hr_bdev) 1156 ret = sprintf(page, "%s\n", reg->hr_dev_name); 1157 1158 return ret; 1159 } 1160 1161 static void o2hb_init_region_params(struct o2hb_region *reg) 1162 { 1163 reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits; 1164 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1165 1166 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1167 reg->hr_start_block, reg->hr_blocks); 1168 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1169 reg->hr_block_bytes, reg->hr_block_bits); 1170 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1171 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1172 } 1173 1174 static int o2hb_map_slot_data(struct o2hb_region *reg) 1175 { 1176 int i, j; 1177 unsigned int last_slot; 1178 unsigned int spp = reg->hr_slots_per_page; 1179 struct page *page; 1180 char *raw; 1181 struct o2hb_disk_slot *slot; 1182 1183 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1184 if (reg->hr_tmp_block == NULL) { 1185 mlog_errno(-ENOMEM); 1186 return -ENOMEM; 1187 } 1188 1189 reg->hr_slots = kcalloc(reg->hr_blocks, 1190 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1191 if (reg->hr_slots == NULL) { 1192 mlog_errno(-ENOMEM); 1193 return -ENOMEM; 1194 } 1195 1196 for(i = 0; i < reg->hr_blocks; i++) { 1197 slot = ®->hr_slots[i]; 1198 slot->ds_node_num = i; 1199 INIT_LIST_HEAD(&slot->ds_live_item); 1200 slot->ds_raw_block = NULL; 1201 } 1202 1203 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1204 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1205 "at %u blocks per page\n", 1206 reg->hr_num_pages, reg->hr_blocks, spp); 1207 1208 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1209 GFP_KERNEL); 1210 if (!reg->hr_slot_data) { 1211 mlog_errno(-ENOMEM); 1212 return -ENOMEM; 1213 } 1214 1215 for(i = 0; i < reg->hr_num_pages; i++) { 1216 page = alloc_page(GFP_KERNEL); 1217 if (!page) { 1218 mlog_errno(-ENOMEM); 1219 return -ENOMEM; 1220 } 1221 1222 reg->hr_slot_data[i] = page; 1223 1224 last_slot = i * spp; 1225 raw = page_address(page); 1226 for (j = 0; 1227 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1228 j++) { 1229 BUG_ON((j + last_slot) >= reg->hr_blocks); 1230 1231 slot = ®->hr_slots[j + last_slot]; 1232 slot->ds_raw_block = 1233 (struct o2hb_disk_heartbeat_block *) raw; 1234 1235 raw += reg->hr_block_bytes; 1236 } 1237 } 1238 1239 return 0; 1240 } 1241 1242 /* Read in all the slots available and populate the tracking 1243 * structures so that we can start with a baseline idea of what's 1244 * there. */ 1245 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1246 { 1247 int ret, i; 1248 struct o2hb_disk_slot *slot; 1249 struct o2hb_disk_heartbeat_block *hb_block; 1250 1251 mlog_entry_void(); 1252 1253 ret = o2hb_read_slots(reg, reg->hr_blocks); 1254 if (ret) { 1255 mlog_errno(ret); 1256 goto out; 1257 } 1258 1259 /* We only want to get an idea of the values initially in each 1260 * slot, so we do no verification - o2hb_check_slot will 1261 * actually determine if each configured slot is valid and 1262 * whether any values have changed. */ 1263 for(i = 0; i < reg->hr_blocks; i++) { 1264 slot = ®->hr_slots[i]; 1265 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1266 1267 /* Only fill the values that o2hb_check_slot uses to 1268 * determine changing slots */ 1269 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1270 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1271 } 1272 1273 out: 1274 mlog_exit(ret); 1275 return ret; 1276 } 1277 1278 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1279 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg, 1280 const char *page, 1281 size_t count) 1282 { 1283 long fd; 1284 int sectsize; 1285 char *p = (char *)page; 1286 struct file *filp = NULL; 1287 struct inode *inode = NULL; 1288 ssize_t ret = -EINVAL; 1289 1290 if (reg->hr_bdev) 1291 goto out; 1292 1293 /* We can't heartbeat without having had our node number 1294 * configured yet. */ 1295 if (o2nm_this_node() == O2NM_MAX_NODES) 1296 goto out; 1297 1298 fd = simple_strtol(p, &p, 0); 1299 if (!p || (*p && (*p != '\n'))) 1300 goto out; 1301 1302 if (fd < 0 || fd >= INT_MAX) 1303 goto out; 1304 1305 filp = fget(fd); 1306 if (filp == NULL) 1307 goto out; 1308 1309 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1310 reg->hr_block_bytes == 0) 1311 goto out; 1312 1313 inode = igrab(filp->f_mapping->host); 1314 if (inode == NULL) 1315 goto out; 1316 1317 if (!S_ISBLK(inode->i_mode)) 1318 goto out; 1319 1320 reg->hr_bdev = I_BDEV(filp->f_mapping->host); 1321 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0); 1322 if (ret) { 1323 reg->hr_bdev = NULL; 1324 goto out; 1325 } 1326 inode = NULL; 1327 1328 bdevname(reg->hr_bdev, reg->hr_dev_name); 1329 1330 sectsize = bdev_hardsect_size(reg->hr_bdev); 1331 if (sectsize != reg->hr_block_bytes) { 1332 mlog(ML_ERROR, 1333 "blocksize %u incorrect for device, expected %d", 1334 reg->hr_block_bytes, sectsize); 1335 ret = -EINVAL; 1336 goto out; 1337 } 1338 1339 o2hb_init_region_params(reg); 1340 1341 /* Generation of zero is invalid */ 1342 do { 1343 get_random_bytes(®->hr_generation, 1344 sizeof(reg->hr_generation)); 1345 } while (reg->hr_generation == 0); 1346 1347 ret = o2hb_map_slot_data(reg); 1348 if (ret) { 1349 mlog_errno(ret); 1350 goto out; 1351 } 1352 1353 ret = o2hb_populate_slot_data(reg); 1354 if (ret) { 1355 mlog_errno(ret); 1356 goto out; 1357 } 1358 1359 INIT_WORK(®->hr_write_timeout_work, o2hb_write_timeout, reg); 1360 1361 /* 1362 * A node is considered live after it has beat LIVE_THRESHOLD 1363 * times. We're not steady until we've given them a chance 1364 * _after_ our first read. 1365 */ 1366 atomic_set(®->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1); 1367 1368 reg->hr_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1369 reg->hr_item.ci_name); 1370 if (IS_ERR(reg->hr_task)) { 1371 ret = PTR_ERR(reg->hr_task); 1372 mlog_errno(ret); 1373 reg->hr_task = NULL; 1374 goto out; 1375 } 1376 1377 ret = wait_event_interruptible(o2hb_steady_queue, 1378 atomic_read(®->hr_steady_iterations) == 0); 1379 if (ret) { 1380 kthread_stop(reg->hr_task); 1381 reg->hr_task = NULL; 1382 goto out; 1383 } 1384 1385 ret = count; 1386 out: 1387 if (filp) 1388 fput(filp); 1389 if (inode) 1390 iput(inode); 1391 if (ret < 0) { 1392 if (reg->hr_bdev) { 1393 blkdev_put(reg->hr_bdev); 1394 reg->hr_bdev = NULL; 1395 } 1396 } 1397 return ret; 1398 } 1399 1400 struct o2hb_region_attribute { 1401 struct configfs_attribute attr; 1402 ssize_t (*show)(struct o2hb_region *, char *); 1403 ssize_t (*store)(struct o2hb_region *, const char *, size_t); 1404 }; 1405 1406 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = { 1407 .attr = { .ca_owner = THIS_MODULE, 1408 .ca_name = "block_bytes", 1409 .ca_mode = S_IRUGO | S_IWUSR }, 1410 .show = o2hb_region_block_bytes_read, 1411 .store = o2hb_region_block_bytes_write, 1412 }; 1413 1414 static struct o2hb_region_attribute o2hb_region_attr_start_block = { 1415 .attr = { .ca_owner = THIS_MODULE, 1416 .ca_name = "start_block", 1417 .ca_mode = S_IRUGO | S_IWUSR }, 1418 .show = o2hb_region_start_block_read, 1419 .store = o2hb_region_start_block_write, 1420 }; 1421 1422 static struct o2hb_region_attribute o2hb_region_attr_blocks = { 1423 .attr = { .ca_owner = THIS_MODULE, 1424 .ca_name = "blocks", 1425 .ca_mode = S_IRUGO | S_IWUSR }, 1426 .show = o2hb_region_blocks_read, 1427 .store = o2hb_region_blocks_write, 1428 }; 1429 1430 static struct o2hb_region_attribute o2hb_region_attr_dev = { 1431 .attr = { .ca_owner = THIS_MODULE, 1432 .ca_name = "dev", 1433 .ca_mode = S_IRUGO | S_IWUSR }, 1434 .show = o2hb_region_dev_read, 1435 .store = o2hb_region_dev_write, 1436 }; 1437 1438 static struct configfs_attribute *o2hb_region_attrs[] = { 1439 &o2hb_region_attr_block_bytes.attr, 1440 &o2hb_region_attr_start_block.attr, 1441 &o2hb_region_attr_blocks.attr, 1442 &o2hb_region_attr_dev.attr, 1443 NULL, 1444 }; 1445 1446 static ssize_t o2hb_region_show(struct config_item *item, 1447 struct configfs_attribute *attr, 1448 char *page) 1449 { 1450 struct o2hb_region *reg = to_o2hb_region(item); 1451 struct o2hb_region_attribute *o2hb_region_attr = 1452 container_of(attr, struct o2hb_region_attribute, attr); 1453 ssize_t ret = 0; 1454 1455 if (o2hb_region_attr->show) 1456 ret = o2hb_region_attr->show(reg, page); 1457 return ret; 1458 } 1459 1460 static ssize_t o2hb_region_store(struct config_item *item, 1461 struct configfs_attribute *attr, 1462 const char *page, size_t count) 1463 { 1464 struct o2hb_region *reg = to_o2hb_region(item); 1465 struct o2hb_region_attribute *o2hb_region_attr = 1466 container_of(attr, struct o2hb_region_attribute, attr); 1467 ssize_t ret = -EINVAL; 1468 1469 if (o2hb_region_attr->store) 1470 ret = o2hb_region_attr->store(reg, page, count); 1471 return ret; 1472 } 1473 1474 static struct configfs_item_operations o2hb_region_item_ops = { 1475 .release = o2hb_region_release, 1476 .show_attribute = o2hb_region_show, 1477 .store_attribute = o2hb_region_store, 1478 }; 1479 1480 static struct config_item_type o2hb_region_type = { 1481 .ct_item_ops = &o2hb_region_item_ops, 1482 .ct_attrs = o2hb_region_attrs, 1483 .ct_owner = THIS_MODULE, 1484 }; 1485 1486 /* heartbeat set */ 1487 1488 struct o2hb_heartbeat_group { 1489 struct config_group hs_group; 1490 /* some stuff? */ 1491 }; 1492 1493 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 1494 { 1495 return group ? 1496 container_of(group, struct o2hb_heartbeat_group, hs_group) 1497 : NULL; 1498 } 1499 1500 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 1501 const char *name) 1502 { 1503 struct o2hb_region *reg = NULL; 1504 struct config_item *ret = NULL; 1505 1506 reg = kcalloc(1, sizeof(struct o2hb_region), GFP_KERNEL); 1507 if (reg == NULL) 1508 goto out; /* ENOMEM */ 1509 1510 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 1511 1512 ret = ®->hr_item; 1513 1514 spin_lock(&o2hb_live_lock); 1515 list_add_tail(®->hr_all_item, &o2hb_all_regions); 1516 spin_unlock(&o2hb_live_lock); 1517 out: 1518 if (ret == NULL) 1519 kfree(reg); 1520 1521 return ret; 1522 } 1523 1524 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 1525 struct config_item *item) 1526 { 1527 struct o2hb_region *reg = to_o2hb_region(item); 1528 1529 /* stop the thread when the user removes the region dir */ 1530 if (reg->hr_task) { 1531 kthread_stop(reg->hr_task); 1532 reg->hr_task = NULL; 1533 } 1534 1535 config_item_put(item); 1536 } 1537 1538 struct o2hb_heartbeat_group_attribute { 1539 struct configfs_attribute attr; 1540 ssize_t (*show)(struct o2hb_heartbeat_group *, char *); 1541 ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t); 1542 }; 1543 1544 static ssize_t o2hb_heartbeat_group_show(struct config_item *item, 1545 struct configfs_attribute *attr, 1546 char *page) 1547 { 1548 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1549 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1550 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1551 ssize_t ret = 0; 1552 1553 if (o2hb_heartbeat_group_attr->show) 1554 ret = o2hb_heartbeat_group_attr->show(reg, page); 1555 return ret; 1556 } 1557 1558 static ssize_t o2hb_heartbeat_group_store(struct config_item *item, 1559 struct configfs_attribute *attr, 1560 const char *page, size_t count) 1561 { 1562 struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item)); 1563 struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr = 1564 container_of(attr, struct o2hb_heartbeat_group_attribute, attr); 1565 ssize_t ret = -EINVAL; 1566 1567 if (o2hb_heartbeat_group_attr->store) 1568 ret = o2hb_heartbeat_group_attr->store(reg, page, count); 1569 return ret; 1570 } 1571 1572 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group, 1573 char *page) 1574 { 1575 return sprintf(page, "%u\n", o2hb_dead_threshold); 1576 } 1577 1578 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group, 1579 const char *page, 1580 size_t count) 1581 { 1582 unsigned long tmp; 1583 char *p = (char *)page; 1584 1585 tmp = simple_strtoul(p, &p, 10); 1586 if (!p || (*p && (*p != '\n'))) 1587 return -EINVAL; 1588 1589 /* this will validate ranges for us. */ 1590 o2hb_dead_threshold_set((unsigned int) tmp); 1591 1592 return count; 1593 } 1594 1595 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = { 1596 .attr = { .ca_owner = THIS_MODULE, 1597 .ca_name = "dead_threshold", 1598 .ca_mode = S_IRUGO | S_IWUSR }, 1599 .show = o2hb_heartbeat_group_threshold_show, 1600 .store = o2hb_heartbeat_group_threshold_store, 1601 }; 1602 1603 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 1604 &o2hb_heartbeat_group_attr_threshold.attr, 1605 NULL, 1606 }; 1607 1608 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = { 1609 .show_attribute = o2hb_heartbeat_group_show, 1610 .store_attribute = o2hb_heartbeat_group_store, 1611 }; 1612 1613 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 1614 .make_item = o2hb_heartbeat_group_make_item, 1615 .drop_item = o2hb_heartbeat_group_drop_item, 1616 }; 1617 1618 static struct config_item_type o2hb_heartbeat_group_type = { 1619 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 1620 .ct_item_ops = &o2hb_hearbeat_group_item_ops, 1621 .ct_attrs = o2hb_heartbeat_group_attrs, 1622 .ct_owner = THIS_MODULE, 1623 }; 1624 1625 /* this is just here to avoid touching group in heartbeat.h which the 1626 * entire damn world #includes */ 1627 struct config_group *o2hb_alloc_hb_set(void) 1628 { 1629 struct o2hb_heartbeat_group *hs = NULL; 1630 struct config_group *ret = NULL; 1631 1632 hs = kcalloc(1, sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 1633 if (hs == NULL) 1634 goto out; 1635 1636 config_group_init_type_name(&hs->hs_group, "heartbeat", 1637 &o2hb_heartbeat_group_type); 1638 1639 ret = &hs->hs_group; 1640 out: 1641 if (ret == NULL) 1642 kfree(hs); 1643 return ret; 1644 } 1645 1646 void o2hb_free_hb_set(struct config_group *group) 1647 { 1648 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 1649 kfree(hs); 1650 } 1651 1652 /* hb callback registration and issueing */ 1653 1654 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 1655 { 1656 if (type == O2HB_NUM_CB) 1657 return ERR_PTR(-EINVAL); 1658 1659 return &o2hb_callbacks[type]; 1660 } 1661 1662 void o2hb_setup_callback(struct o2hb_callback_func *hc, 1663 enum o2hb_callback_type type, 1664 o2hb_cb_func *func, 1665 void *data, 1666 int priority) 1667 { 1668 INIT_LIST_HEAD(&hc->hc_item); 1669 hc->hc_func = func; 1670 hc->hc_data = data; 1671 hc->hc_priority = priority; 1672 hc->hc_type = type; 1673 hc->hc_magic = O2HB_CB_MAGIC; 1674 } 1675 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 1676 1677 int o2hb_register_callback(struct o2hb_callback_func *hc) 1678 { 1679 struct o2hb_callback_func *tmp; 1680 struct list_head *iter; 1681 struct o2hb_callback *hbcall; 1682 int ret; 1683 1684 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1685 BUG_ON(!list_empty(&hc->hc_item)); 1686 1687 hbcall = hbcall_from_type(hc->hc_type); 1688 if (IS_ERR(hbcall)) { 1689 ret = PTR_ERR(hbcall); 1690 goto out; 1691 } 1692 1693 down_write(&o2hb_callback_sem); 1694 1695 list_for_each(iter, &hbcall->list) { 1696 tmp = list_entry(iter, struct o2hb_callback_func, hc_item); 1697 if (hc->hc_priority < tmp->hc_priority) { 1698 list_add_tail(&hc->hc_item, iter); 1699 break; 1700 } 1701 } 1702 if (list_empty(&hc->hc_item)) 1703 list_add_tail(&hc->hc_item, &hbcall->list); 1704 1705 up_write(&o2hb_callback_sem); 1706 ret = 0; 1707 out: 1708 mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n", 1709 ret, __builtin_return_address(0), hc); 1710 return ret; 1711 } 1712 EXPORT_SYMBOL_GPL(o2hb_register_callback); 1713 1714 int o2hb_unregister_callback(struct o2hb_callback_func *hc) 1715 { 1716 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 1717 1718 mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n", 1719 __builtin_return_address(0), hc); 1720 1721 if (list_empty(&hc->hc_item)) 1722 return 0; 1723 1724 down_write(&o2hb_callback_sem); 1725 1726 list_del_init(&hc->hc_item); 1727 1728 up_write(&o2hb_callback_sem); 1729 1730 return 0; 1731 } 1732 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 1733 1734 int o2hb_check_node_heartbeating(u8 node_num) 1735 { 1736 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1737 1738 o2hb_fill_node_map(testing_map, sizeof(testing_map)); 1739 if (!test_bit(node_num, testing_map)) { 1740 mlog(ML_HEARTBEAT, 1741 "node (%u) does not have heartbeating enabled.\n", 1742 node_num); 1743 return 0; 1744 } 1745 1746 return 1; 1747 } 1748 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating); 1749 1750 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 1751 { 1752 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1753 1754 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 1755 if (!test_bit(node_num, testing_map)) { 1756 mlog(ML_HEARTBEAT, 1757 "node (%u) does not have heartbeating enabled.\n", 1758 node_num); 1759 return 0; 1760 } 1761 1762 return 1; 1763 } 1764 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 1765 1766 /* Makes sure our local node is configured with a node number, and is 1767 * heartbeating. */ 1768 int o2hb_check_local_node_heartbeating(void) 1769 { 1770 u8 node_num; 1771 1772 /* if this node was set then we have networking */ 1773 node_num = o2nm_this_node(); 1774 if (node_num == O2NM_MAX_NODES) { 1775 mlog(ML_HEARTBEAT, "this node has not been configured.\n"); 1776 return 0; 1777 } 1778 1779 return o2hb_check_node_heartbeating(node_num); 1780 } 1781 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating); 1782 1783 /* 1784 * this is just a hack until we get the plumbing which flips file systems 1785 * read only and drops the hb ref instead of killing the node dead. 1786 */ 1787 void o2hb_stop_all_regions(void) 1788 { 1789 struct o2hb_region *reg; 1790 1791 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 1792 1793 spin_lock(&o2hb_live_lock); 1794 1795 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 1796 reg->hr_unclean_stop = 1; 1797 1798 spin_unlock(&o2hb_live_lock); 1799 } 1800 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 1801