1 // SPDX-License-Identifier: GPL-2.0-or-later 2 /* -*- mode: c; c-basic-offset: 8; -*- 3 * vim: noexpandtab sw=8 ts=8 sts=0: 4 * 5 * Copyright (C) 2004, 2005 Oracle. All rights reserved. 6 */ 7 8 #include <linux/kernel.h> 9 #include <linux/sched.h> 10 #include <linux/jiffies.h> 11 #include <linux/module.h> 12 #include <linux/fs.h> 13 #include <linux/bio.h> 14 #include <linux/blkdev.h> 15 #include <linux/delay.h> 16 #include <linux/file.h> 17 #include <linux/kthread.h> 18 #include <linux/configfs.h> 19 #include <linux/random.h> 20 #include <linux/crc32.h> 21 #include <linux/time.h> 22 #include <linux/debugfs.h> 23 #include <linux/slab.h> 24 #include <linux/bitmap.h> 25 #include <linux/ktime.h> 26 #include "heartbeat.h" 27 #include "tcp.h" 28 #include "nodemanager.h" 29 #include "quorum.h" 30 31 #include "masklog.h" 32 33 34 /* 35 * The first heartbeat pass had one global thread that would serialize all hb 36 * callback calls. This global serializing sem should only be removed once 37 * we've made sure that all callees can deal with being called concurrently 38 * from multiple hb region threads. 39 */ 40 static DECLARE_RWSEM(o2hb_callback_sem); 41 42 /* 43 * multiple hb threads are watching multiple regions. A node is live 44 * whenever any of the threads sees activity from the node in its region. 45 */ 46 static DEFINE_SPINLOCK(o2hb_live_lock); 47 static struct list_head o2hb_live_slots[O2NM_MAX_NODES]; 48 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 49 static LIST_HEAD(o2hb_node_events); 50 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue); 51 52 /* 53 * In global heartbeat, we maintain a series of region bitmaps. 54 * - o2hb_region_bitmap allows us to limit the region number to max region. 55 * - o2hb_live_region_bitmap tracks live regions (seen steady iterations). 56 * - o2hb_quorum_region_bitmap tracks live regions that have seen all nodes 57 * heartbeat on it. 58 * - o2hb_failed_region_bitmap tracks the regions that have seen io timeouts. 59 */ 60 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 61 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 62 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 63 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)]; 64 65 #define O2HB_DB_TYPE_LIVENODES 0 66 #define O2HB_DB_TYPE_LIVEREGIONS 1 67 #define O2HB_DB_TYPE_QUORUMREGIONS 2 68 #define O2HB_DB_TYPE_FAILEDREGIONS 3 69 #define O2HB_DB_TYPE_REGION_LIVENODES 4 70 #define O2HB_DB_TYPE_REGION_NUMBER 5 71 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME 6 72 #define O2HB_DB_TYPE_REGION_PINNED 7 73 struct o2hb_debug_buf { 74 int db_type; 75 int db_size; 76 int db_len; 77 void *db_data; 78 }; 79 80 static struct o2hb_debug_buf *o2hb_db_livenodes; 81 static struct o2hb_debug_buf *o2hb_db_liveregions; 82 static struct o2hb_debug_buf *o2hb_db_quorumregions; 83 static struct o2hb_debug_buf *o2hb_db_failedregions; 84 85 #define O2HB_DEBUG_DIR "o2hb" 86 #define O2HB_DEBUG_LIVENODES "livenodes" 87 #define O2HB_DEBUG_LIVEREGIONS "live_regions" 88 #define O2HB_DEBUG_QUORUMREGIONS "quorum_regions" 89 #define O2HB_DEBUG_FAILEDREGIONS "failed_regions" 90 #define O2HB_DEBUG_REGION_NUMBER "num" 91 #define O2HB_DEBUG_REGION_ELAPSED_TIME "elapsed_time_in_ms" 92 #define O2HB_DEBUG_REGION_PINNED "pinned" 93 94 static struct dentry *o2hb_debug_dir; 95 static struct dentry *o2hb_debug_livenodes; 96 static struct dentry *o2hb_debug_liveregions; 97 static struct dentry *o2hb_debug_quorumregions; 98 static struct dentry *o2hb_debug_failedregions; 99 100 static LIST_HEAD(o2hb_all_regions); 101 102 static struct o2hb_callback { 103 struct list_head list; 104 } o2hb_callbacks[O2HB_NUM_CB]; 105 106 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type); 107 108 #define O2HB_DEFAULT_BLOCK_BITS 9 109 110 enum o2hb_heartbeat_modes { 111 O2HB_HEARTBEAT_LOCAL = 0, 112 O2HB_HEARTBEAT_GLOBAL, 113 O2HB_HEARTBEAT_NUM_MODES, 114 }; 115 116 static const char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = { 117 "local", /* O2HB_HEARTBEAT_LOCAL */ 118 "global", /* O2HB_HEARTBEAT_GLOBAL */ 119 }; 120 121 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD; 122 static unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL; 123 124 /* 125 * o2hb_dependent_users tracks the number of registered callbacks that depend 126 * on heartbeat. o2net and o2dlm are two entities that register this callback. 127 * However only o2dlm depends on the heartbeat. It does not want the heartbeat 128 * to stop while a dlm domain is still active. 129 */ 130 static unsigned int o2hb_dependent_users; 131 132 /* 133 * In global heartbeat mode, all regions are pinned if there are one or more 134 * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All 135 * regions are unpinned if the region count exceeds the cut off or the number 136 * of dependent users falls to zero. 137 */ 138 #define O2HB_PIN_CUT_OFF 3 139 140 /* 141 * In local heartbeat mode, we assume the dlm domain name to be the same as 142 * region uuid. This is true for domains created for the file system but not 143 * necessarily true for userdlm domains. This is a known limitation. 144 * 145 * In global heartbeat mode, we pin/unpin all o2hb regions. This solution 146 * works for both file system and userdlm domains. 147 */ 148 static int o2hb_region_pin(const char *region_uuid); 149 static void o2hb_region_unpin(const char *region_uuid); 150 151 /* Only sets a new threshold if there are no active regions. 152 * 153 * No locking or otherwise interesting code is required for reading 154 * o2hb_dead_threshold as it can't change once regions are active and 155 * it's not interesting to anyone until then anyway. */ 156 static void o2hb_dead_threshold_set(unsigned int threshold) 157 { 158 if (threshold > O2HB_MIN_DEAD_THRESHOLD) { 159 spin_lock(&o2hb_live_lock); 160 if (list_empty(&o2hb_all_regions)) 161 o2hb_dead_threshold = threshold; 162 spin_unlock(&o2hb_live_lock); 163 } 164 } 165 166 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode) 167 { 168 int ret = -1; 169 170 if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) { 171 spin_lock(&o2hb_live_lock); 172 if (list_empty(&o2hb_all_regions)) { 173 o2hb_heartbeat_mode = hb_mode; 174 ret = 0; 175 } 176 spin_unlock(&o2hb_live_lock); 177 } 178 179 return ret; 180 } 181 182 struct o2hb_node_event { 183 struct list_head hn_item; 184 enum o2hb_callback_type hn_event_type; 185 struct o2nm_node *hn_node; 186 int hn_node_num; 187 }; 188 189 struct o2hb_disk_slot { 190 struct o2hb_disk_heartbeat_block *ds_raw_block; 191 u8 ds_node_num; 192 u64 ds_last_time; 193 u64 ds_last_generation; 194 u16 ds_equal_samples; 195 u16 ds_changed_samples; 196 struct list_head ds_live_item; 197 }; 198 199 /* each thread owns a region.. when we're asked to tear down the region 200 * we ask the thread to stop, who cleans up the region */ 201 struct o2hb_region { 202 struct config_item hr_item; 203 204 struct list_head hr_all_item; 205 unsigned hr_unclean_stop:1, 206 hr_aborted_start:1, 207 hr_item_pinned:1, 208 hr_item_dropped:1, 209 hr_node_deleted:1; 210 211 /* protected by the hr_callback_sem */ 212 struct task_struct *hr_task; 213 214 unsigned int hr_blocks; 215 unsigned long long hr_start_block; 216 217 unsigned int hr_block_bits; 218 unsigned int hr_block_bytes; 219 220 unsigned int hr_slots_per_page; 221 unsigned int hr_num_pages; 222 223 struct page **hr_slot_data; 224 struct block_device *hr_bdev; 225 struct o2hb_disk_slot *hr_slots; 226 227 /* live node map of this region */ 228 unsigned long hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 229 unsigned int hr_region_num; 230 231 struct dentry *hr_debug_dir; 232 struct dentry *hr_debug_livenodes; 233 struct dentry *hr_debug_regnum; 234 struct dentry *hr_debug_elapsed_time; 235 struct dentry *hr_debug_pinned; 236 struct o2hb_debug_buf *hr_db_livenodes; 237 struct o2hb_debug_buf *hr_db_regnum; 238 struct o2hb_debug_buf *hr_db_elapsed_time; 239 struct o2hb_debug_buf *hr_db_pinned; 240 241 /* let the person setting up hb wait for it to return until it 242 * has reached a 'steady' state. This will be fixed when we have 243 * a more complete api that doesn't lead to this sort of fragility. */ 244 atomic_t hr_steady_iterations; 245 246 /* terminate o2hb thread if it does not reach steady state 247 * (hr_steady_iterations == 0) within hr_unsteady_iterations */ 248 atomic_t hr_unsteady_iterations; 249 250 char hr_dev_name[BDEVNAME_SIZE]; 251 252 unsigned int hr_timeout_ms; 253 254 /* randomized as the region goes up and down so that a node 255 * recognizes a node going up and down in one iteration */ 256 u64 hr_generation; 257 258 struct delayed_work hr_write_timeout_work; 259 unsigned long hr_last_timeout_start; 260 261 /* negotiate timer, used to negotiate extending hb timeout. */ 262 struct delayed_work hr_nego_timeout_work; 263 unsigned long hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 264 265 /* Used during o2hb_check_slot to hold a copy of the block 266 * being checked because we temporarily have to zero out the 267 * crc field. */ 268 struct o2hb_disk_heartbeat_block *hr_tmp_block; 269 270 /* Message key for negotiate timeout message. */ 271 unsigned int hr_key; 272 struct list_head hr_handler_list; 273 274 /* last hb status, 0 for success, other value for error. */ 275 int hr_last_hb_status; 276 }; 277 278 struct o2hb_bio_wait_ctxt { 279 atomic_t wc_num_reqs; 280 struct completion wc_io_complete; 281 int wc_error; 282 }; 283 284 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2) 285 286 enum { 287 O2HB_NEGO_TIMEOUT_MSG = 1, 288 O2HB_NEGO_APPROVE_MSG = 2, 289 }; 290 291 struct o2hb_nego_msg { 292 u8 node_num; 293 }; 294 295 static void o2hb_write_timeout(struct work_struct *work) 296 { 297 int failed, quorum; 298 struct o2hb_region *reg = 299 container_of(work, struct o2hb_region, 300 hr_write_timeout_work.work); 301 302 mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u " 303 "milliseconds\n", reg->hr_dev_name, 304 jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 305 306 if (o2hb_global_heartbeat_active()) { 307 spin_lock(&o2hb_live_lock); 308 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 309 set_bit(reg->hr_region_num, o2hb_failed_region_bitmap); 310 failed = bitmap_weight(o2hb_failed_region_bitmap, 311 O2NM_MAX_REGIONS); 312 quorum = bitmap_weight(o2hb_quorum_region_bitmap, 313 O2NM_MAX_REGIONS); 314 spin_unlock(&o2hb_live_lock); 315 316 mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n", 317 quorum, failed); 318 319 /* 320 * Fence if the number of failed regions >= half the number 321 * of quorum regions 322 */ 323 if ((failed << 1) < quorum) 324 return; 325 } 326 327 o2quo_disk_timeout(); 328 } 329 330 static void o2hb_arm_timeout(struct o2hb_region *reg) 331 { 332 /* Arm writeout only after thread reaches steady state */ 333 if (atomic_read(®->hr_steady_iterations) != 0) 334 return; 335 336 mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n", 337 O2HB_MAX_WRITE_TIMEOUT_MS); 338 339 if (o2hb_global_heartbeat_active()) { 340 spin_lock(&o2hb_live_lock); 341 clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap); 342 spin_unlock(&o2hb_live_lock); 343 } 344 cancel_delayed_work(®->hr_write_timeout_work); 345 schedule_delayed_work(®->hr_write_timeout_work, 346 msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS)); 347 348 cancel_delayed_work(®->hr_nego_timeout_work); 349 /* negotiate timeout must be less than write timeout. */ 350 schedule_delayed_work(®->hr_nego_timeout_work, 351 msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS)); 352 memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap)); 353 } 354 355 static void o2hb_disarm_timeout(struct o2hb_region *reg) 356 { 357 cancel_delayed_work_sync(®->hr_write_timeout_work); 358 cancel_delayed_work_sync(®->hr_nego_timeout_work); 359 } 360 361 static int o2hb_send_nego_msg(int key, int type, u8 target) 362 { 363 struct o2hb_nego_msg msg; 364 int status, ret; 365 366 msg.node_num = o2nm_this_node(); 367 again: 368 ret = o2net_send_message(type, key, &msg, sizeof(msg), 369 target, &status); 370 371 if (ret == -EAGAIN || ret == -ENOMEM) { 372 msleep(100); 373 goto again; 374 } 375 376 return ret; 377 } 378 379 static void o2hb_nego_timeout(struct work_struct *work) 380 { 381 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 382 int master_node, i, ret; 383 struct o2hb_region *reg; 384 385 reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work); 386 /* don't negotiate timeout if last hb failed since it is very 387 * possible io failed. Should let write timeout fence self. 388 */ 389 if (reg->hr_last_hb_status) 390 return; 391 392 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); 393 /* lowest node as master node to make negotiate decision. */ 394 master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0); 395 396 if (master_node == o2nm_this_node()) { 397 if (!test_bit(master_node, reg->hr_nego_node_bitmap)) { 398 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n", 399 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, 400 config_item_name(®->hr_item), reg->hr_dev_name); 401 set_bit(master_node, reg->hr_nego_node_bitmap); 402 } 403 if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap, 404 sizeof(reg->hr_nego_node_bitmap))) { 405 /* check negotiate bitmap every second to do timeout 406 * approve decision. 407 */ 408 schedule_delayed_work(®->hr_nego_timeout_work, 409 msecs_to_jiffies(1000)); 410 411 return; 412 } 413 414 printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n", 415 config_item_name(®->hr_item), reg->hr_dev_name); 416 /* approve negotiate timeout request. */ 417 o2hb_arm_timeout(reg); 418 419 i = -1; 420 while ((i = find_next_bit(live_node_bitmap, 421 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 422 if (i == master_node) 423 continue; 424 425 mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i); 426 ret = o2hb_send_nego_msg(reg->hr_key, 427 O2HB_NEGO_APPROVE_MSG, i); 428 if (ret) 429 mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n", 430 i, ret); 431 } 432 } else { 433 /* negotiate timeout with master node. */ 434 printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n", 435 o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(®->hr_item), 436 reg->hr_dev_name, master_node); 437 ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG, 438 master_node); 439 if (ret) 440 mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n", 441 master_node, ret); 442 } 443 } 444 445 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data, 446 void **ret_data) 447 { 448 struct o2hb_region *reg = data; 449 struct o2hb_nego_msg *nego_msg; 450 451 nego_msg = (struct o2hb_nego_msg *)msg->buf; 452 printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n", 453 nego_msg->node_num, config_item_name(®->hr_item), reg->hr_dev_name); 454 if (nego_msg->node_num < O2NM_MAX_NODES) 455 set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap); 456 else 457 mlog(ML_ERROR, "got nego timeout message from bad node.\n"); 458 459 return 0; 460 } 461 462 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data, 463 void **ret_data) 464 { 465 struct o2hb_region *reg = data; 466 467 printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n", 468 config_item_name(®->hr_item), reg->hr_dev_name); 469 o2hb_arm_timeout(reg); 470 return 0; 471 } 472 473 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc) 474 { 475 atomic_set(&wc->wc_num_reqs, 1); 476 init_completion(&wc->wc_io_complete); 477 wc->wc_error = 0; 478 } 479 480 /* Used in error paths too */ 481 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc, 482 unsigned int num) 483 { 484 /* sadly atomic_sub_and_test() isn't available on all platforms. The 485 * good news is that the fast path only completes one at a time */ 486 while(num--) { 487 if (atomic_dec_and_test(&wc->wc_num_reqs)) { 488 BUG_ON(num > 0); 489 complete(&wc->wc_io_complete); 490 } 491 } 492 } 493 494 static void o2hb_wait_on_io(struct o2hb_bio_wait_ctxt *wc) 495 { 496 o2hb_bio_wait_dec(wc, 1); 497 wait_for_completion(&wc->wc_io_complete); 498 } 499 500 static void o2hb_bio_end_io(struct bio *bio) 501 { 502 struct o2hb_bio_wait_ctxt *wc = bio->bi_private; 503 504 if (bio->bi_status) { 505 mlog(ML_ERROR, "IO Error %d\n", bio->bi_status); 506 wc->wc_error = blk_status_to_errno(bio->bi_status); 507 } 508 509 o2hb_bio_wait_dec(wc, 1); 510 bio_put(bio); 511 } 512 513 /* Setup a Bio to cover I/O against num_slots slots starting at 514 * start_slot. */ 515 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg, 516 struct o2hb_bio_wait_ctxt *wc, 517 unsigned int *current_slot, 518 unsigned int max_slots, int op, 519 int op_flags) 520 { 521 int len, current_page; 522 unsigned int vec_len, vec_start; 523 unsigned int bits = reg->hr_block_bits; 524 unsigned int spp = reg->hr_slots_per_page; 525 unsigned int cs = *current_slot; 526 struct bio *bio; 527 struct page *page; 528 529 /* Testing has shown this allocation to take long enough under 530 * GFP_KERNEL that the local node can get fenced. It would be 531 * nicest if we could pre-allocate these bios and avoid this 532 * all together. */ 533 bio = bio_alloc(GFP_ATOMIC, 16); 534 if (!bio) { 535 mlog(ML_ERROR, "Could not alloc slots BIO!\n"); 536 bio = ERR_PTR(-ENOMEM); 537 goto bail; 538 } 539 540 /* Must put everything in 512 byte sectors for the bio... */ 541 bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9); 542 bio_set_dev(bio, reg->hr_bdev); 543 bio->bi_private = wc; 544 bio->bi_end_io = o2hb_bio_end_io; 545 bio_set_op_attrs(bio, op, op_flags); 546 547 vec_start = (cs << bits) % PAGE_SIZE; 548 while(cs < max_slots) { 549 current_page = cs / spp; 550 page = reg->hr_slot_data[current_page]; 551 552 vec_len = min(PAGE_SIZE - vec_start, 553 (max_slots-cs) * (PAGE_SIZE/spp) ); 554 555 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n", 556 current_page, vec_len, vec_start); 557 558 len = bio_add_page(bio, page, vec_len, vec_start); 559 if (len != vec_len) break; 560 561 cs += vec_len / (PAGE_SIZE/spp); 562 vec_start = 0; 563 } 564 565 bail: 566 *current_slot = cs; 567 return bio; 568 } 569 570 static int o2hb_read_slots(struct o2hb_region *reg, 571 unsigned int begin_slot, 572 unsigned int max_slots) 573 { 574 unsigned int current_slot = begin_slot; 575 int status; 576 struct o2hb_bio_wait_ctxt wc; 577 struct bio *bio; 578 579 o2hb_bio_wait_init(&wc); 580 581 while(current_slot < max_slots) { 582 bio = o2hb_setup_one_bio(reg, &wc, ¤t_slot, max_slots, 583 REQ_OP_READ, 0); 584 if (IS_ERR(bio)) { 585 status = PTR_ERR(bio); 586 mlog_errno(status); 587 goto bail_and_wait; 588 } 589 590 atomic_inc(&wc.wc_num_reqs); 591 submit_bio(bio); 592 } 593 594 status = 0; 595 596 bail_and_wait: 597 o2hb_wait_on_io(&wc); 598 if (wc.wc_error && !status) 599 status = wc.wc_error; 600 601 return status; 602 } 603 604 static int o2hb_issue_node_write(struct o2hb_region *reg, 605 struct o2hb_bio_wait_ctxt *write_wc) 606 { 607 int status; 608 unsigned int slot; 609 struct bio *bio; 610 611 o2hb_bio_wait_init(write_wc); 612 613 slot = o2nm_this_node(); 614 615 bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1, REQ_OP_WRITE, 616 REQ_SYNC); 617 if (IS_ERR(bio)) { 618 status = PTR_ERR(bio); 619 mlog_errno(status); 620 goto bail; 621 } 622 623 atomic_inc(&write_wc->wc_num_reqs); 624 submit_bio(bio); 625 626 status = 0; 627 bail: 628 return status; 629 } 630 631 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg, 632 struct o2hb_disk_heartbeat_block *hb_block) 633 { 634 __le32 old_cksum; 635 u32 ret; 636 637 /* We want to compute the block crc with a 0 value in the 638 * hb_cksum field. Save it off here and replace after the 639 * crc. */ 640 old_cksum = hb_block->hb_cksum; 641 hb_block->hb_cksum = 0; 642 643 ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes); 644 645 hb_block->hb_cksum = old_cksum; 646 647 return ret; 648 } 649 650 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block) 651 { 652 mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, " 653 "cksum = 0x%x, generation 0x%llx\n", 654 (long long)le64_to_cpu(hb_block->hb_seq), 655 hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum), 656 (long long)le64_to_cpu(hb_block->hb_generation)); 657 } 658 659 static int o2hb_verify_crc(struct o2hb_region *reg, 660 struct o2hb_disk_heartbeat_block *hb_block) 661 { 662 u32 read, computed; 663 664 read = le32_to_cpu(hb_block->hb_cksum); 665 computed = o2hb_compute_block_crc_le(reg, hb_block); 666 667 return read == computed; 668 } 669 670 /* 671 * Compare the slot data with what we wrote in the last iteration. 672 * If the match fails, print an appropriate error message. This is to 673 * detect errors like... another node hearting on the same slot, 674 * flaky device that is losing writes, etc. 675 * Returns 1 if check succeeds, 0 otherwise. 676 */ 677 static int o2hb_check_own_slot(struct o2hb_region *reg) 678 { 679 struct o2hb_disk_slot *slot; 680 struct o2hb_disk_heartbeat_block *hb_block; 681 char *errstr; 682 683 slot = ®->hr_slots[o2nm_this_node()]; 684 /* Don't check on our 1st timestamp */ 685 if (!slot->ds_last_time) 686 return 0; 687 688 hb_block = slot->ds_raw_block; 689 if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time && 690 le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation && 691 hb_block->hb_node == slot->ds_node_num) 692 return 1; 693 694 #define ERRSTR1 "Another node is heartbeating on device" 695 #define ERRSTR2 "Heartbeat generation mismatch on device" 696 #define ERRSTR3 "Heartbeat sequence mismatch on device" 697 698 if (hb_block->hb_node != slot->ds_node_num) 699 errstr = ERRSTR1; 700 else if (le64_to_cpu(hb_block->hb_generation) != 701 slot->ds_last_generation) 702 errstr = ERRSTR2; 703 else 704 errstr = ERRSTR3; 705 706 mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), " 707 "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name, 708 slot->ds_node_num, (unsigned long long)slot->ds_last_generation, 709 (unsigned long long)slot->ds_last_time, hb_block->hb_node, 710 (unsigned long long)le64_to_cpu(hb_block->hb_generation), 711 (unsigned long long)le64_to_cpu(hb_block->hb_seq)); 712 713 return 0; 714 } 715 716 static inline void o2hb_prepare_block(struct o2hb_region *reg, 717 u64 generation) 718 { 719 int node_num; 720 u64 cputime; 721 struct o2hb_disk_slot *slot; 722 struct o2hb_disk_heartbeat_block *hb_block; 723 724 node_num = o2nm_this_node(); 725 slot = ®->hr_slots[node_num]; 726 727 hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block; 728 memset(hb_block, 0, reg->hr_block_bytes); 729 /* TODO: time stuff */ 730 cputime = ktime_get_real_seconds(); 731 if (!cputime) 732 cputime = 1; 733 734 hb_block->hb_seq = cpu_to_le64(cputime); 735 hb_block->hb_node = node_num; 736 hb_block->hb_generation = cpu_to_le64(generation); 737 hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS); 738 739 /* This step must always happen last! */ 740 hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg, 741 hb_block)); 742 743 mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n", 744 (long long)generation, 745 le32_to_cpu(hb_block->hb_cksum)); 746 } 747 748 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall, 749 struct o2nm_node *node, 750 int idx) 751 { 752 struct o2hb_callback_func *f; 753 754 list_for_each_entry(f, &hbcall->list, hc_item) { 755 mlog(ML_HEARTBEAT, "calling funcs %p\n", f); 756 (f->hc_func)(node, idx, f->hc_data); 757 } 758 } 759 760 /* Will run the list in order until we process the passed event */ 761 static void o2hb_run_event_list(struct o2hb_node_event *queued_event) 762 { 763 struct o2hb_callback *hbcall; 764 struct o2hb_node_event *event; 765 766 /* Holding callback sem assures we don't alter the callback 767 * lists when doing this, and serializes ourselves with other 768 * processes wanting callbacks. */ 769 down_write(&o2hb_callback_sem); 770 771 spin_lock(&o2hb_live_lock); 772 while (!list_empty(&o2hb_node_events) 773 && !list_empty(&queued_event->hn_item)) { 774 event = list_entry(o2hb_node_events.next, 775 struct o2hb_node_event, 776 hn_item); 777 list_del_init(&event->hn_item); 778 spin_unlock(&o2hb_live_lock); 779 780 mlog(ML_HEARTBEAT, "Node %s event for %d\n", 781 event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN", 782 event->hn_node_num); 783 784 hbcall = hbcall_from_type(event->hn_event_type); 785 786 /* We should *never* have gotten on to the list with a 787 * bad type... This isn't something that we should try 788 * to recover from. */ 789 BUG_ON(IS_ERR(hbcall)); 790 791 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num); 792 793 spin_lock(&o2hb_live_lock); 794 } 795 spin_unlock(&o2hb_live_lock); 796 797 up_write(&o2hb_callback_sem); 798 } 799 800 static void o2hb_queue_node_event(struct o2hb_node_event *event, 801 enum o2hb_callback_type type, 802 struct o2nm_node *node, 803 int node_num) 804 { 805 assert_spin_locked(&o2hb_live_lock); 806 807 BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB)); 808 809 event->hn_event_type = type; 810 event->hn_node = node; 811 event->hn_node_num = node_num; 812 813 mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n", 814 type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num); 815 816 list_add_tail(&event->hn_item, &o2hb_node_events); 817 } 818 819 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot) 820 { 821 struct o2hb_node_event event = 822 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 823 struct o2nm_node *node; 824 int queued = 0; 825 826 node = o2nm_get_node_by_num(slot->ds_node_num); 827 if (!node) 828 return; 829 830 spin_lock(&o2hb_live_lock); 831 if (!list_empty(&slot->ds_live_item)) { 832 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n", 833 slot->ds_node_num); 834 835 list_del_init(&slot->ds_live_item); 836 837 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 838 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 839 840 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node, 841 slot->ds_node_num); 842 queued = 1; 843 } 844 } 845 spin_unlock(&o2hb_live_lock); 846 847 if (queued) 848 o2hb_run_event_list(&event); 849 850 o2nm_node_put(node); 851 } 852 853 static void o2hb_set_quorum_device(struct o2hb_region *reg) 854 { 855 if (!o2hb_global_heartbeat_active()) 856 return; 857 858 /* Prevent race with o2hb_heartbeat_group_drop_item() */ 859 if (kthread_should_stop()) 860 return; 861 862 /* Tag region as quorum only after thread reaches steady state */ 863 if (atomic_read(®->hr_steady_iterations) != 0) 864 return; 865 866 spin_lock(&o2hb_live_lock); 867 868 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 869 goto unlock; 870 871 /* 872 * A region can be added to the quorum only when it sees all 873 * live nodes heartbeat on it. In other words, the region has been 874 * added to all nodes. 875 */ 876 if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap, 877 sizeof(o2hb_live_node_bitmap))) 878 goto unlock; 879 880 printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n", 881 config_item_name(®->hr_item), reg->hr_dev_name); 882 883 set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); 884 885 /* 886 * If global heartbeat active, unpin all regions if the 887 * region count > CUT_OFF 888 */ 889 if (bitmap_weight(o2hb_quorum_region_bitmap, 890 O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF) 891 o2hb_region_unpin(NULL); 892 unlock: 893 spin_unlock(&o2hb_live_lock); 894 } 895 896 static int o2hb_check_slot(struct o2hb_region *reg, 897 struct o2hb_disk_slot *slot) 898 { 899 int changed = 0, gen_changed = 0; 900 struct o2hb_node_event event = 901 { .hn_item = LIST_HEAD_INIT(event.hn_item), }; 902 struct o2nm_node *node; 903 struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block; 904 u64 cputime; 905 unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS; 906 unsigned int slot_dead_ms; 907 int tmp; 908 int queued = 0; 909 910 memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes); 911 912 /* 913 * If a node is no longer configured but is still in the livemap, we 914 * may need to clear that bit from the livemap. 915 */ 916 node = o2nm_get_node_by_num(slot->ds_node_num); 917 if (!node) { 918 spin_lock(&o2hb_live_lock); 919 tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap); 920 spin_unlock(&o2hb_live_lock); 921 if (!tmp) 922 return 0; 923 } 924 925 if (!o2hb_verify_crc(reg, hb_block)) { 926 /* all paths from here will drop o2hb_live_lock for 927 * us. */ 928 spin_lock(&o2hb_live_lock); 929 930 /* Don't print an error on the console in this case - 931 * a freshly formatted heartbeat area will not have a 932 * crc set on it. */ 933 if (list_empty(&slot->ds_live_item)) 934 goto out; 935 936 /* The node is live but pushed out a bad crc. We 937 * consider it a transient miss but don't populate any 938 * other values as they may be junk. */ 939 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n", 940 slot->ds_node_num, reg->hr_dev_name); 941 o2hb_dump_slot(hb_block); 942 943 slot->ds_equal_samples++; 944 goto fire_callbacks; 945 } 946 947 /* we don't care if these wrap.. the state transitions below 948 * clear at the right places */ 949 cputime = le64_to_cpu(hb_block->hb_seq); 950 if (slot->ds_last_time != cputime) 951 slot->ds_changed_samples++; 952 else 953 slot->ds_equal_samples++; 954 slot->ds_last_time = cputime; 955 956 /* The node changed heartbeat generations. We assume this to 957 * mean it dropped off but came back before we timed out. We 958 * want to consider it down for the time being but don't want 959 * to lose any changed_samples state we might build up to 960 * considering it live again. */ 961 if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) { 962 gen_changed = 1; 963 slot->ds_equal_samples = 0; 964 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx " 965 "to 0x%llx)\n", slot->ds_node_num, 966 (long long)slot->ds_last_generation, 967 (long long)le64_to_cpu(hb_block->hb_generation)); 968 } 969 970 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 971 972 mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x " 973 "seq %llu last %llu changed %u equal %u\n", 974 slot->ds_node_num, (long long)slot->ds_last_generation, 975 le32_to_cpu(hb_block->hb_cksum), 976 (unsigned long long)le64_to_cpu(hb_block->hb_seq), 977 (unsigned long long)slot->ds_last_time, slot->ds_changed_samples, 978 slot->ds_equal_samples); 979 980 spin_lock(&o2hb_live_lock); 981 982 fire_callbacks: 983 /* dead nodes only come to life after some number of 984 * changes at any time during their dead time */ 985 if (list_empty(&slot->ds_live_item) && 986 slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) { 987 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n", 988 slot->ds_node_num, (long long)slot->ds_last_generation); 989 990 set_bit(slot->ds_node_num, reg->hr_live_node_bitmap); 991 992 /* first on the list generates a callback */ 993 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 994 mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes " 995 "bitmap\n", slot->ds_node_num); 996 set_bit(slot->ds_node_num, o2hb_live_node_bitmap); 997 998 o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node, 999 slot->ds_node_num); 1000 1001 changed = 1; 1002 queued = 1; 1003 } 1004 1005 list_add_tail(&slot->ds_live_item, 1006 &o2hb_live_slots[slot->ds_node_num]); 1007 1008 slot->ds_equal_samples = 0; 1009 1010 /* We want to be sure that all nodes agree on the 1011 * number of milliseconds before a node will be 1012 * considered dead. The self-fencing timeout is 1013 * computed from this value, and a discrepancy might 1014 * result in heartbeat calling a node dead when it 1015 * hasn't self-fenced yet. */ 1016 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms); 1017 if (slot_dead_ms && slot_dead_ms != dead_ms) { 1018 /* TODO: Perhaps we can fail the region here. */ 1019 mlog(ML_ERROR, "Node %d on device %s has a dead count " 1020 "of %u ms, but our count is %u ms.\n" 1021 "Please double check your configuration values " 1022 "for 'O2CB_HEARTBEAT_THRESHOLD'\n", 1023 slot->ds_node_num, reg->hr_dev_name, slot_dead_ms, 1024 dead_ms); 1025 } 1026 goto out; 1027 } 1028 1029 /* if the list is dead, we're done.. */ 1030 if (list_empty(&slot->ds_live_item)) 1031 goto out; 1032 1033 /* live nodes only go dead after enough consequtive missed 1034 * samples.. reset the missed counter whenever we see 1035 * activity */ 1036 if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) { 1037 mlog(ML_HEARTBEAT, "Node %d left my region\n", 1038 slot->ds_node_num); 1039 1040 clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap); 1041 1042 /* last off the live_slot generates a callback */ 1043 list_del_init(&slot->ds_live_item); 1044 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) { 1045 mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live " 1046 "nodes bitmap\n", slot->ds_node_num); 1047 clear_bit(slot->ds_node_num, o2hb_live_node_bitmap); 1048 1049 /* node can be null */ 1050 o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, 1051 node, slot->ds_node_num); 1052 1053 changed = 1; 1054 queued = 1; 1055 } 1056 1057 /* We don't clear this because the node is still 1058 * actually writing new blocks. */ 1059 if (!gen_changed) 1060 slot->ds_changed_samples = 0; 1061 goto out; 1062 } 1063 if (slot->ds_changed_samples) { 1064 slot->ds_changed_samples = 0; 1065 slot->ds_equal_samples = 0; 1066 } 1067 out: 1068 spin_unlock(&o2hb_live_lock); 1069 1070 if (queued) 1071 o2hb_run_event_list(&event); 1072 1073 if (node) 1074 o2nm_node_put(node); 1075 return changed; 1076 } 1077 1078 static int o2hb_highest_node(unsigned long *nodes, int numbits) 1079 { 1080 return find_last_bit(nodes, numbits); 1081 } 1082 1083 static int o2hb_lowest_node(unsigned long *nodes, int numbits) 1084 { 1085 return find_first_bit(nodes, numbits); 1086 } 1087 1088 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg) 1089 { 1090 int i, ret, highest_node, lowest_node; 1091 int membership_change = 0, own_slot_ok = 0; 1092 unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1093 unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1094 struct o2hb_bio_wait_ctxt write_wc; 1095 1096 ret = o2nm_configured_node_map(configured_nodes, 1097 sizeof(configured_nodes)); 1098 if (ret) { 1099 mlog_errno(ret); 1100 goto bail; 1101 } 1102 1103 /* 1104 * If a node is not configured but is in the livemap, we still need 1105 * to read the slot so as to be able to remove it from the livemap. 1106 */ 1107 o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap)); 1108 i = -1; 1109 while ((i = find_next_bit(live_node_bitmap, 1110 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 1111 set_bit(i, configured_nodes); 1112 } 1113 1114 highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES); 1115 lowest_node = o2hb_lowest_node(configured_nodes, O2NM_MAX_NODES); 1116 if (highest_node >= O2NM_MAX_NODES || lowest_node >= O2NM_MAX_NODES) { 1117 mlog(ML_NOTICE, "o2hb: No configured nodes found!\n"); 1118 ret = -EINVAL; 1119 goto bail; 1120 } 1121 1122 /* No sense in reading the slots of nodes that don't exist 1123 * yet. Of course, if the node definitions have holes in them 1124 * then we're reading an empty slot anyway... Consider this 1125 * best-effort. */ 1126 ret = o2hb_read_slots(reg, lowest_node, highest_node + 1); 1127 if (ret < 0) { 1128 mlog_errno(ret); 1129 goto bail; 1130 } 1131 1132 /* With an up to date view of the slots, we can check that no 1133 * other node has been improperly configured to heartbeat in 1134 * our slot. */ 1135 own_slot_ok = o2hb_check_own_slot(reg); 1136 1137 /* fill in the proper info for our next heartbeat */ 1138 o2hb_prepare_block(reg, reg->hr_generation); 1139 1140 ret = o2hb_issue_node_write(reg, &write_wc); 1141 if (ret < 0) { 1142 mlog_errno(ret); 1143 goto bail; 1144 } 1145 1146 i = -1; 1147 while((i = find_next_bit(configured_nodes, 1148 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) { 1149 membership_change |= o2hb_check_slot(reg, ®->hr_slots[i]); 1150 } 1151 1152 /* 1153 * We have to be sure we've advertised ourselves on disk 1154 * before we can go to steady state. This ensures that 1155 * people we find in our steady state have seen us. 1156 */ 1157 o2hb_wait_on_io(&write_wc); 1158 if (write_wc.wc_error) { 1159 /* Do not re-arm the write timeout on I/O error - we 1160 * can't be sure that the new block ever made it to 1161 * disk */ 1162 mlog(ML_ERROR, "Write error %d on device \"%s\"\n", 1163 write_wc.wc_error, reg->hr_dev_name); 1164 ret = write_wc.wc_error; 1165 goto bail; 1166 } 1167 1168 /* Skip disarming the timeout if own slot has stale/bad data */ 1169 if (own_slot_ok) { 1170 o2hb_set_quorum_device(reg); 1171 o2hb_arm_timeout(reg); 1172 reg->hr_last_timeout_start = jiffies; 1173 } 1174 1175 bail: 1176 /* let the person who launched us know when things are steady */ 1177 if (atomic_read(®->hr_steady_iterations) != 0) { 1178 if (!ret && own_slot_ok && !membership_change) { 1179 if (atomic_dec_and_test(®->hr_steady_iterations)) 1180 wake_up(&o2hb_steady_queue); 1181 } 1182 } 1183 1184 if (atomic_read(®->hr_steady_iterations) != 0) { 1185 if (atomic_dec_and_test(®->hr_unsteady_iterations)) { 1186 printk(KERN_NOTICE "o2hb: Unable to stabilize " 1187 "heartbeart on region %s (%s)\n", 1188 config_item_name(®->hr_item), 1189 reg->hr_dev_name); 1190 atomic_set(®->hr_steady_iterations, 0); 1191 reg->hr_aborted_start = 1; 1192 wake_up(&o2hb_steady_queue); 1193 ret = -EIO; 1194 } 1195 } 1196 1197 return ret; 1198 } 1199 1200 /* 1201 * we ride the region ref that the region dir holds. before the region 1202 * dir is removed and drops it ref it will wait to tear down this 1203 * thread. 1204 */ 1205 static int o2hb_thread(void *data) 1206 { 1207 int i, ret; 1208 struct o2hb_region *reg = data; 1209 struct o2hb_bio_wait_ctxt write_wc; 1210 ktime_t before_hb, after_hb; 1211 unsigned int elapsed_msec; 1212 1213 mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n"); 1214 1215 set_user_nice(current, MIN_NICE); 1216 1217 /* Pin node */ 1218 ret = o2nm_depend_this_node(); 1219 if (ret) { 1220 mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret); 1221 reg->hr_node_deleted = 1; 1222 wake_up(&o2hb_steady_queue); 1223 return 0; 1224 } 1225 1226 while (!kthread_should_stop() && 1227 !reg->hr_unclean_stop && !reg->hr_aborted_start) { 1228 /* We track the time spent inside 1229 * o2hb_do_disk_heartbeat so that we avoid more than 1230 * hr_timeout_ms between disk writes. On busy systems 1231 * this should result in a heartbeat which is less 1232 * likely to time itself out. */ 1233 before_hb = ktime_get_real(); 1234 1235 ret = o2hb_do_disk_heartbeat(reg); 1236 reg->hr_last_hb_status = ret; 1237 1238 after_hb = ktime_get_real(); 1239 1240 elapsed_msec = (unsigned int) 1241 ktime_ms_delta(after_hb, before_hb); 1242 1243 mlog(ML_HEARTBEAT, 1244 "start = %lld, end = %lld, msec = %u, ret = %d\n", 1245 before_hb, after_hb, elapsed_msec, ret); 1246 1247 if (!kthread_should_stop() && 1248 elapsed_msec < reg->hr_timeout_ms) { 1249 /* the kthread api has blocked signals for us so no 1250 * need to record the return value. */ 1251 msleep_interruptible(reg->hr_timeout_ms - elapsed_msec); 1252 } 1253 } 1254 1255 o2hb_disarm_timeout(reg); 1256 1257 /* unclean stop is only used in very bad situation */ 1258 for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++) 1259 o2hb_shutdown_slot(®->hr_slots[i]); 1260 1261 /* Explicit down notification - avoid forcing the other nodes 1262 * to timeout on this region when we could just as easily 1263 * write a clear generation - thus indicating to them that 1264 * this node has left this region. 1265 */ 1266 if (!reg->hr_unclean_stop && !reg->hr_aborted_start) { 1267 o2hb_prepare_block(reg, 0); 1268 ret = o2hb_issue_node_write(reg, &write_wc); 1269 if (ret == 0) 1270 o2hb_wait_on_io(&write_wc); 1271 else 1272 mlog_errno(ret); 1273 } 1274 1275 /* Unpin node */ 1276 o2nm_undepend_this_node(); 1277 1278 mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n"); 1279 1280 return 0; 1281 } 1282 1283 #ifdef CONFIG_DEBUG_FS 1284 static int o2hb_debug_open(struct inode *inode, struct file *file) 1285 { 1286 struct o2hb_debug_buf *db = inode->i_private; 1287 struct o2hb_region *reg; 1288 unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 1289 unsigned long lts; 1290 char *buf = NULL; 1291 int i = -1; 1292 int out = 0; 1293 1294 /* max_nodes should be the largest bitmap we pass here */ 1295 BUG_ON(sizeof(map) < db->db_size); 1296 1297 buf = kmalloc(PAGE_SIZE, GFP_KERNEL); 1298 if (!buf) 1299 goto bail; 1300 1301 switch (db->db_type) { 1302 case O2HB_DB_TYPE_LIVENODES: 1303 case O2HB_DB_TYPE_LIVEREGIONS: 1304 case O2HB_DB_TYPE_QUORUMREGIONS: 1305 case O2HB_DB_TYPE_FAILEDREGIONS: 1306 spin_lock(&o2hb_live_lock); 1307 memcpy(map, db->db_data, db->db_size); 1308 spin_unlock(&o2hb_live_lock); 1309 break; 1310 1311 case O2HB_DB_TYPE_REGION_LIVENODES: 1312 spin_lock(&o2hb_live_lock); 1313 reg = (struct o2hb_region *)db->db_data; 1314 memcpy(map, reg->hr_live_node_bitmap, db->db_size); 1315 spin_unlock(&o2hb_live_lock); 1316 break; 1317 1318 case O2HB_DB_TYPE_REGION_NUMBER: 1319 reg = (struct o2hb_region *)db->db_data; 1320 out += snprintf(buf + out, PAGE_SIZE - out, "%d\n", 1321 reg->hr_region_num); 1322 goto done; 1323 1324 case O2HB_DB_TYPE_REGION_ELAPSED_TIME: 1325 reg = (struct o2hb_region *)db->db_data; 1326 lts = reg->hr_last_timeout_start; 1327 /* If 0, it has never been set before */ 1328 if (lts) 1329 lts = jiffies_to_msecs(jiffies - lts); 1330 out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts); 1331 goto done; 1332 1333 case O2HB_DB_TYPE_REGION_PINNED: 1334 reg = (struct o2hb_region *)db->db_data; 1335 out += snprintf(buf + out, PAGE_SIZE - out, "%u\n", 1336 !!reg->hr_item_pinned); 1337 goto done; 1338 1339 default: 1340 goto done; 1341 } 1342 1343 while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len) 1344 out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i); 1345 out += snprintf(buf + out, PAGE_SIZE - out, "\n"); 1346 1347 done: 1348 i_size_write(inode, out); 1349 1350 file->private_data = buf; 1351 1352 return 0; 1353 bail: 1354 return -ENOMEM; 1355 } 1356 1357 static int o2hb_debug_release(struct inode *inode, struct file *file) 1358 { 1359 kfree(file->private_data); 1360 return 0; 1361 } 1362 1363 static ssize_t o2hb_debug_read(struct file *file, char __user *buf, 1364 size_t nbytes, loff_t *ppos) 1365 { 1366 return simple_read_from_buffer(buf, nbytes, ppos, file->private_data, 1367 i_size_read(file->f_mapping->host)); 1368 } 1369 #else 1370 static int o2hb_debug_open(struct inode *inode, struct file *file) 1371 { 1372 return 0; 1373 } 1374 static int o2hb_debug_release(struct inode *inode, struct file *file) 1375 { 1376 return 0; 1377 } 1378 static ssize_t o2hb_debug_read(struct file *file, char __user *buf, 1379 size_t nbytes, loff_t *ppos) 1380 { 1381 return 0; 1382 } 1383 #endif /* CONFIG_DEBUG_FS */ 1384 1385 static const struct file_operations o2hb_debug_fops = { 1386 .open = o2hb_debug_open, 1387 .release = o2hb_debug_release, 1388 .read = o2hb_debug_read, 1389 .llseek = generic_file_llseek, 1390 }; 1391 1392 void o2hb_exit(void) 1393 { 1394 debugfs_remove(o2hb_debug_failedregions); 1395 debugfs_remove(o2hb_debug_quorumregions); 1396 debugfs_remove(o2hb_debug_liveregions); 1397 debugfs_remove(o2hb_debug_livenodes); 1398 debugfs_remove(o2hb_debug_dir); 1399 kfree(o2hb_db_livenodes); 1400 kfree(o2hb_db_liveregions); 1401 kfree(o2hb_db_quorumregions); 1402 kfree(o2hb_db_failedregions); 1403 } 1404 1405 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir, 1406 struct o2hb_debug_buf **db, int db_len, 1407 int type, int size, int len, void *data) 1408 { 1409 *db = kmalloc(db_len, GFP_KERNEL); 1410 if (!*db) 1411 return NULL; 1412 1413 (*db)->db_type = type; 1414 (*db)->db_size = size; 1415 (*db)->db_len = len; 1416 (*db)->db_data = data; 1417 1418 return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db, 1419 &o2hb_debug_fops); 1420 } 1421 1422 static int o2hb_debug_init(void) 1423 { 1424 int ret = -ENOMEM; 1425 1426 o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL); 1427 if (!o2hb_debug_dir) { 1428 mlog_errno(ret); 1429 goto bail; 1430 } 1431 1432 o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES, 1433 o2hb_debug_dir, 1434 &o2hb_db_livenodes, 1435 sizeof(*o2hb_db_livenodes), 1436 O2HB_DB_TYPE_LIVENODES, 1437 sizeof(o2hb_live_node_bitmap), 1438 O2NM_MAX_NODES, 1439 o2hb_live_node_bitmap); 1440 if (!o2hb_debug_livenodes) { 1441 mlog_errno(ret); 1442 goto bail; 1443 } 1444 1445 o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS, 1446 o2hb_debug_dir, 1447 &o2hb_db_liveregions, 1448 sizeof(*o2hb_db_liveregions), 1449 O2HB_DB_TYPE_LIVEREGIONS, 1450 sizeof(o2hb_live_region_bitmap), 1451 O2NM_MAX_REGIONS, 1452 o2hb_live_region_bitmap); 1453 if (!o2hb_debug_liveregions) { 1454 mlog_errno(ret); 1455 goto bail; 1456 } 1457 1458 o2hb_debug_quorumregions = 1459 o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS, 1460 o2hb_debug_dir, 1461 &o2hb_db_quorumregions, 1462 sizeof(*o2hb_db_quorumregions), 1463 O2HB_DB_TYPE_QUORUMREGIONS, 1464 sizeof(o2hb_quorum_region_bitmap), 1465 O2NM_MAX_REGIONS, 1466 o2hb_quorum_region_bitmap); 1467 if (!o2hb_debug_quorumregions) { 1468 mlog_errno(ret); 1469 goto bail; 1470 } 1471 1472 o2hb_debug_failedregions = 1473 o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS, 1474 o2hb_debug_dir, 1475 &o2hb_db_failedregions, 1476 sizeof(*o2hb_db_failedregions), 1477 O2HB_DB_TYPE_FAILEDREGIONS, 1478 sizeof(o2hb_failed_region_bitmap), 1479 O2NM_MAX_REGIONS, 1480 o2hb_failed_region_bitmap); 1481 if (!o2hb_debug_failedregions) { 1482 mlog_errno(ret); 1483 goto bail; 1484 } 1485 1486 ret = 0; 1487 bail: 1488 if (ret) 1489 o2hb_exit(); 1490 1491 return ret; 1492 } 1493 1494 int o2hb_init(void) 1495 { 1496 int i; 1497 1498 for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++) 1499 INIT_LIST_HEAD(&o2hb_callbacks[i].list); 1500 1501 for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++) 1502 INIT_LIST_HEAD(&o2hb_live_slots[i]); 1503 1504 INIT_LIST_HEAD(&o2hb_node_events); 1505 1506 memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap)); 1507 memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap)); 1508 memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap)); 1509 memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap)); 1510 memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap)); 1511 1512 o2hb_dependent_users = 0; 1513 1514 return o2hb_debug_init(); 1515 } 1516 1517 /* if we're already in a callback then we're already serialized by the sem */ 1518 static void o2hb_fill_node_map_from_callback(unsigned long *map, 1519 unsigned bytes) 1520 { 1521 BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long))); 1522 1523 memcpy(map, &o2hb_live_node_bitmap, bytes); 1524 } 1525 1526 /* 1527 * get a map of all nodes that are heartbeating in any regions 1528 */ 1529 void o2hb_fill_node_map(unsigned long *map, unsigned bytes) 1530 { 1531 /* callers want to serialize this map and callbacks so that they 1532 * can trust that they don't miss nodes coming to the party */ 1533 down_read(&o2hb_callback_sem); 1534 spin_lock(&o2hb_live_lock); 1535 o2hb_fill_node_map_from_callback(map, bytes); 1536 spin_unlock(&o2hb_live_lock); 1537 up_read(&o2hb_callback_sem); 1538 } 1539 EXPORT_SYMBOL_GPL(o2hb_fill_node_map); 1540 1541 /* 1542 * heartbeat configfs bits. The heartbeat set is a default set under 1543 * the cluster set in nodemanager.c. 1544 */ 1545 1546 static struct o2hb_region *to_o2hb_region(struct config_item *item) 1547 { 1548 return item ? container_of(item, struct o2hb_region, hr_item) : NULL; 1549 } 1550 1551 /* drop_item only drops its ref after killing the thread, nothing should 1552 * be using the region anymore. this has to clean up any state that 1553 * attributes might have built up. */ 1554 static void o2hb_region_release(struct config_item *item) 1555 { 1556 int i; 1557 struct page *page; 1558 struct o2hb_region *reg = to_o2hb_region(item); 1559 1560 mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name); 1561 1562 kfree(reg->hr_tmp_block); 1563 1564 if (reg->hr_slot_data) { 1565 for (i = 0; i < reg->hr_num_pages; i++) { 1566 page = reg->hr_slot_data[i]; 1567 if (page) 1568 __free_page(page); 1569 } 1570 kfree(reg->hr_slot_data); 1571 } 1572 1573 if (reg->hr_bdev) 1574 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); 1575 1576 kfree(reg->hr_slots); 1577 1578 debugfs_remove(reg->hr_debug_livenodes); 1579 debugfs_remove(reg->hr_debug_regnum); 1580 debugfs_remove(reg->hr_debug_elapsed_time); 1581 debugfs_remove(reg->hr_debug_pinned); 1582 debugfs_remove(reg->hr_debug_dir); 1583 kfree(reg->hr_db_livenodes); 1584 kfree(reg->hr_db_regnum); 1585 kfree(reg->hr_db_elapsed_time); 1586 kfree(reg->hr_db_pinned); 1587 1588 spin_lock(&o2hb_live_lock); 1589 list_del(®->hr_all_item); 1590 spin_unlock(&o2hb_live_lock); 1591 1592 o2net_unregister_handler_list(®->hr_handler_list); 1593 kfree(reg); 1594 } 1595 1596 static int o2hb_read_block_input(struct o2hb_region *reg, 1597 const char *page, 1598 unsigned long *ret_bytes, 1599 unsigned int *ret_bits) 1600 { 1601 unsigned long bytes; 1602 char *p = (char *)page; 1603 1604 bytes = simple_strtoul(p, &p, 0); 1605 if (!p || (*p && (*p != '\n'))) 1606 return -EINVAL; 1607 1608 /* Heartbeat and fs min / max block sizes are the same. */ 1609 if (bytes > 4096 || bytes < 512) 1610 return -ERANGE; 1611 if (hweight16(bytes) != 1) 1612 return -EINVAL; 1613 1614 if (ret_bytes) 1615 *ret_bytes = bytes; 1616 if (ret_bits) 1617 *ret_bits = ffs(bytes) - 1; 1618 1619 return 0; 1620 } 1621 1622 static ssize_t o2hb_region_block_bytes_show(struct config_item *item, 1623 char *page) 1624 { 1625 return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes); 1626 } 1627 1628 static ssize_t o2hb_region_block_bytes_store(struct config_item *item, 1629 const char *page, 1630 size_t count) 1631 { 1632 struct o2hb_region *reg = to_o2hb_region(item); 1633 int status; 1634 unsigned long block_bytes; 1635 unsigned int block_bits; 1636 1637 if (reg->hr_bdev) 1638 return -EINVAL; 1639 1640 status = o2hb_read_block_input(reg, page, &block_bytes, 1641 &block_bits); 1642 if (status) 1643 return status; 1644 1645 reg->hr_block_bytes = (unsigned int)block_bytes; 1646 reg->hr_block_bits = block_bits; 1647 1648 return count; 1649 } 1650 1651 static ssize_t o2hb_region_start_block_show(struct config_item *item, 1652 char *page) 1653 { 1654 return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block); 1655 } 1656 1657 static ssize_t o2hb_region_start_block_store(struct config_item *item, 1658 const char *page, 1659 size_t count) 1660 { 1661 struct o2hb_region *reg = to_o2hb_region(item); 1662 unsigned long long tmp; 1663 char *p = (char *)page; 1664 1665 if (reg->hr_bdev) 1666 return -EINVAL; 1667 1668 tmp = simple_strtoull(p, &p, 0); 1669 if (!p || (*p && (*p != '\n'))) 1670 return -EINVAL; 1671 1672 reg->hr_start_block = tmp; 1673 1674 return count; 1675 } 1676 1677 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page) 1678 { 1679 return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks); 1680 } 1681 1682 static ssize_t o2hb_region_blocks_store(struct config_item *item, 1683 const char *page, 1684 size_t count) 1685 { 1686 struct o2hb_region *reg = to_o2hb_region(item); 1687 unsigned long tmp; 1688 char *p = (char *)page; 1689 1690 if (reg->hr_bdev) 1691 return -EINVAL; 1692 1693 tmp = simple_strtoul(p, &p, 0); 1694 if (!p || (*p && (*p != '\n'))) 1695 return -EINVAL; 1696 1697 if (tmp > O2NM_MAX_NODES || tmp == 0) 1698 return -ERANGE; 1699 1700 reg->hr_blocks = (unsigned int)tmp; 1701 1702 return count; 1703 } 1704 1705 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page) 1706 { 1707 unsigned int ret = 0; 1708 1709 if (to_o2hb_region(item)->hr_bdev) 1710 ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name); 1711 1712 return ret; 1713 } 1714 1715 static void o2hb_init_region_params(struct o2hb_region *reg) 1716 { 1717 reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits; 1718 reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS; 1719 1720 mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n", 1721 reg->hr_start_block, reg->hr_blocks); 1722 mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n", 1723 reg->hr_block_bytes, reg->hr_block_bits); 1724 mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms); 1725 mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold); 1726 } 1727 1728 static int o2hb_map_slot_data(struct o2hb_region *reg) 1729 { 1730 int i, j; 1731 unsigned int last_slot; 1732 unsigned int spp = reg->hr_slots_per_page; 1733 struct page *page; 1734 char *raw; 1735 struct o2hb_disk_slot *slot; 1736 1737 reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL); 1738 if (reg->hr_tmp_block == NULL) 1739 return -ENOMEM; 1740 1741 reg->hr_slots = kcalloc(reg->hr_blocks, 1742 sizeof(struct o2hb_disk_slot), GFP_KERNEL); 1743 if (reg->hr_slots == NULL) 1744 return -ENOMEM; 1745 1746 for(i = 0; i < reg->hr_blocks; i++) { 1747 slot = ®->hr_slots[i]; 1748 slot->ds_node_num = i; 1749 INIT_LIST_HEAD(&slot->ds_live_item); 1750 slot->ds_raw_block = NULL; 1751 } 1752 1753 reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp; 1754 mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks " 1755 "at %u blocks per page\n", 1756 reg->hr_num_pages, reg->hr_blocks, spp); 1757 1758 reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *), 1759 GFP_KERNEL); 1760 if (!reg->hr_slot_data) 1761 return -ENOMEM; 1762 1763 for(i = 0; i < reg->hr_num_pages; i++) { 1764 page = alloc_page(GFP_KERNEL); 1765 if (!page) 1766 return -ENOMEM; 1767 1768 reg->hr_slot_data[i] = page; 1769 1770 last_slot = i * spp; 1771 raw = page_address(page); 1772 for (j = 0; 1773 (j < spp) && ((j + last_slot) < reg->hr_blocks); 1774 j++) { 1775 BUG_ON((j + last_slot) >= reg->hr_blocks); 1776 1777 slot = ®->hr_slots[j + last_slot]; 1778 slot->ds_raw_block = 1779 (struct o2hb_disk_heartbeat_block *) raw; 1780 1781 raw += reg->hr_block_bytes; 1782 } 1783 } 1784 1785 return 0; 1786 } 1787 1788 /* Read in all the slots available and populate the tracking 1789 * structures so that we can start with a baseline idea of what's 1790 * there. */ 1791 static int o2hb_populate_slot_data(struct o2hb_region *reg) 1792 { 1793 int ret, i; 1794 struct o2hb_disk_slot *slot; 1795 struct o2hb_disk_heartbeat_block *hb_block; 1796 1797 ret = o2hb_read_slots(reg, 0, reg->hr_blocks); 1798 if (ret) 1799 goto out; 1800 1801 /* We only want to get an idea of the values initially in each 1802 * slot, so we do no verification - o2hb_check_slot will 1803 * actually determine if each configured slot is valid and 1804 * whether any values have changed. */ 1805 for(i = 0; i < reg->hr_blocks; i++) { 1806 slot = ®->hr_slots[i]; 1807 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block; 1808 1809 /* Only fill the values that o2hb_check_slot uses to 1810 * determine changing slots */ 1811 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq); 1812 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation); 1813 } 1814 1815 out: 1816 return ret; 1817 } 1818 1819 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */ 1820 static ssize_t o2hb_region_dev_store(struct config_item *item, 1821 const char *page, 1822 size_t count) 1823 { 1824 struct o2hb_region *reg = to_o2hb_region(item); 1825 struct task_struct *hb_task; 1826 long fd; 1827 int sectsize; 1828 char *p = (char *)page; 1829 struct fd f; 1830 struct inode *inode; 1831 ssize_t ret = -EINVAL; 1832 int live_threshold; 1833 1834 if (reg->hr_bdev) 1835 goto out; 1836 1837 /* We can't heartbeat without having had our node number 1838 * configured yet. */ 1839 if (o2nm_this_node() == O2NM_MAX_NODES) 1840 goto out; 1841 1842 fd = simple_strtol(p, &p, 0); 1843 if (!p || (*p && (*p != '\n'))) 1844 goto out; 1845 1846 if (fd < 0 || fd >= INT_MAX) 1847 goto out; 1848 1849 f = fdget(fd); 1850 if (f.file == NULL) 1851 goto out; 1852 1853 if (reg->hr_blocks == 0 || reg->hr_start_block == 0 || 1854 reg->hr_block_bytes == 0) 1855 goto out2; 1856 1857 inode = igrab(f.file->f_mapping->host); 1858 if (inode == NULL) 1859 goto out2; 1860 1861 if (!S_ISBLK(inode->i_mode)) 1862 goto out3; 1863 1864 reg->hr_bdev = I_BDEV(f.file->f_mapping->host); 1865 ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL); 1866 if (ret) { 1867 reg->hr_bdev = NULL; 1868 goto out3; 1869 } 1870 inode = NULL; 1871 1872 bdevname(reg->hr_bdev, reg->hr_dev_name); 1873 1874 sectsize = bdev_logical_block_size(reg->hr_bdev); 1875 if (sectsize != reg->hr_block_bytes) { 1876 mlog(ML_ERROR, 1877 "blocksize %u incorrect for device, expected %d", 1878 reg->hr_block_bytes, sectsize); 1879 ret = -EINVAL; 1880 goto out3; 1881 } 1882 1883 o2hb_init_region_params(reg); 1884 1885 /* Generation of zero is invalid */ 1886 do { 1887 get_random_bytes(®->hr_generation, 1888 sizeof(reg->hr_generation)); 1889 } while (reg->hr_generation == 0); 1890 1891 ret = o2hb_map_slot_data(reg); 1892 if (ret) { 1893 mlog_errno(ret); 1894 goto out3; 1895 } 1896 1897 ret = o2hb_populate_slot_data(reg); 1898 if (ret) { 1899 mlog_errno(ret); 1900 goto out3; 1901 } 1902 1903 INIT_DELAYED_WORK(®->hr_write_timeout_work, o2hb_write_timeout); 1904 INIT_DELAYED_WORK(®->hr_nego_timeout_work, o2hb_nego_timeout); 1905 1906 /* 1907 * A node is considered live after it has beat LIVE_THRESHOLD 1908 * times. We're not steady until we've given them a chance 1909 * _after_ our first read. 1910 * The default threshold is bare minimum so as to limit the delay 1911 * during mounts. For global heartbeat, the threshold doubled for the 1912 * first region. 1913 */ 1914 live_threshold = O2HB_LIVE_THRESHOLD; 1915 if (o2hb_global_heartbeat_active()) { 1916 spin_lock(&o2hb_live_lock); 1917 if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1) 1918 live_threshold <<= 1; 1919 spin_unlock(&o2hb_live_lock); 1920 } 1921 ++live_threshold; 1922 atomic_set(®->hr_steady_iterations, live_threshold); 1923 /* unsteady_iterations is triple the steady_iterations */ 1924 atomic_set(®->hr_unsteady_iterations, (live_threshold * 3)); 1925 1926 hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s", 1927 reg->hr_item.ci_name); 1928 if (IS_ERR(hb_task)) { 1929 ret = PTR_ERR(hb_task); 1930 mlog_errno(ret); 1931 goto out3; 1932 } 1933 1934 spin_lock(&o2hb_live_lock); 1935 reg->hr_task = hb_task; 1936 spin_unlock(&o2hb_live_lock); 1937 1938 ret = wait_event_interruptible(o2hb_steady_queue, 1939 atomic_read(®->hr_steady_iterations) == 0 || 1940 reg->hr_node_deleted); 1941 if (ret) { 1942 atomic_set(®->hr_steady_iterations, 0); 1943 reg->hr_aborted_start = 1; 1944 } 1945 1946 if (reg->hr_aborted_start) { 1947 ret = -EIO; 1948 goto out3; 1949 } 1950 1951 if (reg->hr_node_deleted) { 1952 ret = -EINVAL; 1953 goto out3; 1954 } 1955 1956 /* Ok, we were woken. Make sure it wasn't by drop_item() */ 1957 spin_lock(&o2hb_live_lock); 1958 hb_task = reg->hr_task; 1959 if (o2hb_global_heartbeat_active()) 1960 set_bit(reg->hr_region_num, o2hb_live_region_bitmap); 1961 spin_unlock(&o2hb_live_lock); 1962 1963 if (hb_task) 1964 ret = count; 1965 else 1966 ret = -EIO; 1967 1968 if (hb_task && o2hb_global_heartbeat_active()) 1969 printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n", 1970 config_item_name(®->hr_item), reg->hr_dev_name); 1971 1972 out3: 1973 iput(inode); 1974 out2: 1975 fdput(f); 1976 out: 1977 if (ret < 0) { 1978 if (reg->hr_bdev) { 1979 blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE); 1980 reg->hr_bdev = NULL; 1981 } 1982 } 1983 return ret; 1984 } 1985 1986 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page) 1987 { 1988 struct o2hb_region *reg = to_o2hb_region(item); 1989 pid_t pid = 0; 1990 1991 spin_lock(&o2hb_live_lock); 1992 if (reg->hr_task) 1993 pid = task_pid_nr(reg->hr_task); 1994 spin_unlock(&o2hb_live_lock); 1995 1996 if (!pid) 1997 return 0; 1998 1999 return sprintf(page, "%u\n", pid); 2000 } 2001 2002 CONFIGFS_ATTR(o2hb_region_, block_bytes); 2003 CONFIGFS_ATTR(o2hb_region_, start_block); 2004 CONFIGFS_ATTR(o2hb_region_, blocks); 2005 CONFIGFS_ATTR(o2hb_region_, dev); 2006 CONFIGFS_ATTR_RO(o2hb_region_, pid); 2007 2008 static struct configfs_attribute *o2hb_region_attrs[] = { 2009 &o2hb_region_attr_block_bytes, 2010 &o2hb_region_attr_start_block, 2011 &o2hb_region_attr_blocks, 2012 &o2hb_region_attr_dev, 2013 &o2hb_region_attr_pid, 2014 NULL, 2015 }; 2016 2017 static struct configfs_item_operations o2hb_region_item_ops = { 2018 .release = o2hb_region_release, 2019 }; 2020 2021 static const struct config_item_type o2hb_region_type = { 2022 .ct_item_ops = &o2hb_region_item_ops, 2023 .ct_attrs = o2hb_region_attrs, 2024 .ct_owner = THIS_MODULE, 2025 }; 2026 2027 /* heartbeat set */ 2028 2029 struct o2hb_heartbeat_group { 2030 struct config_group hs_group; 2031 /* some stuff? */ 2032 }; 2033 2034 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group) 2035 { 2036 return group ? 2037 container_of(group, struct o2hb_heartbeat_group, hs_group) 2038 : NULL; 2039 } 2040 2041 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir) 2042 { 2043 int ret = -ENOMEM; 2044 2045 reg->hr_debug_dir = 2046 debugfs_create_dir(config_item_name(®->hr_item), dir); 2047 if (!reg->hr_debug_dir) { 2048 mlog_errno(ret); 2049 goto bail; 2050 } 2051 2052 reg->hr_debug_livenodes = 2053 o2hb_debug_create(O2HB_DEBUG_LIVENODES, 2054 reg->hr_debug_dir, 2055 &(reg->hr_db_livenodes), 2056 sizeof(*(reg->hr_db_livenodes)), 2057 O2HB_DB_TYPE_REGION_LIVENODES, 2058 sizeof(reg->hr_live_node_bitmap), 2059 O2NM_MAX_NODES, reg); 2060 if (!reg->hr_debug_livenodes) { 2061 mlog_errno(ret); 2062 goto bail; 2063 } 2064 2065 reg->hr_debug_regnum = 2066 o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER, 2067 reg->hr_debug_dir, 2068 &(reg->hr_db_regnum), 2069 sizeof(*(reg->hr_db_regnum)), 2070 O2HB_DB_TYPE_REGION_NUMBER, 2071 0, O2NM_MAX_NODES, reg); 2072 if (!reg->hr_debug_regnum) { 2073 mlog_errno(ret); 2074 goto bail; 2075 } 2076 2077 reg->hr_debug_elapsed_time = 2078 o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME, 2079 reg->hr_debug_dir, 2080 &(reg->hr_db_elapsed_time), 2081 sizeof(*(reg->hr_db_elapsed_time)), 2082 O2HB_DB_TYPE_REGION_ELAPSED_TIME, 2083 0, 0, reg); 2084 if (!reg->hr_debug_elapsed_time) { 2085 mlog_errno(ret); 2086 goto bail; 2087 } 2088 2089 reg->hr_debug_pinned = 2090 o2hb_debug_create(O2HB_DEBUG_REGION_PINNED, 2091 reg->hr_debug_dir, 2092 &(reg->hr_db_pinned), 2093 sizeof(*(reg->hr_db_pinned)), 2094 O2HB_DB_TYPE_REGION_PINNED, 2095 0, 0, reg); 2096 if (!reg->hr_debug_pinned) { 2097 mlog_errno(ret); 2098 goto bail; 2099 } 2100 2101 ret = 0; 2102 bail: 2103 return ret; 2104 } 2105 2106 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group, 2107 const char *name) 2108 { 2109 struct o2hb_region *reg = NULL; 2110 int ret; 2111 2112 reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL); 2113 if (reg == NULL) 2114 return ERR_PTR(-ENOMEM); 2115 2116 if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) { 2117 ret = -ENAMETOOLONG; 2118 goto free; 2119 } 2120 2121 spin_lock(&o2hb_live_lock); 2122 reg->hr_region_num = 0; 2123 if (o2hb_global_heartbeat_active()) { 2124 reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap, 2125 O2NM_MAX_REGIONS); 2126 if (reg->hr_region_num >= O2NM_MAX_REGIONS) { 2127 spin_unlock(&o2hb_live_lock); 2128 ret = -EFBIG; 2129 goto free; 2130 } 2131 set_bit(reg->hr_region_num, o2hb_region_bitmap); 2132 } 2133 list_add_tail(®->hr_all_item, &o2hb_all_regions); 2134 spin_unlock(&o2hb_live_lock); 2135 2136 config_item_init_type_name(®->hr_item, name, &o2hb_region_type); 2137 2138 /* this is the same way to generate msg key as dlm, for local heartbeat, 2139 * name is also the same, so make initial crc value different to avoid 2140 * message key conflict. 2141 */ 2142 reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS, 2143 name, strlen(name)); 2144 INIT_LIST_HEAD(®->hr_handler_list); 2145 ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key, 2146 sizeof(struct o2hb_nego_msg), 2147 o2hb_nego_timeout_handler, 2148 reg, NULL, ®->hr_handler_list); 2149 if (ret) 2150 goto free; 2151 2152 ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key, 2153 sizeof(struct o2hb_nego_msg), 2154 o2hb_nego_approve_handler, 2155 reg, NULL, ®->hr_handler_list); 2156 if (ret) 2157 goto unregister_handler; 2158 2159 ret = o2hb_debug_region_init(reg, o2hb_debug_dir); 2160 if (ret) { 2161 config_item_put(®->hr_item); 2162 goto unregister_handler; 2163 } 2164 2165 return ®->hr_item; 2166 2167 unregister_handler: 2168 o2net_unregister_handler_list(®->hr_handler_list); 2169 free: 2170 kfree(reg); 2171 return ERR_PTR(ret); 2172 } 2173 2174 static void o2hb_heartbeat_group_drop_item(struct config_group *group, 2175 struct config_item *item) 2176 { 2177 struct task_struct *hb_task; 2178 struct o2hb_region *reg = to_o2hb_region(item); 2179 int quorum_region = 0; 2180 2181 /* stop the thread when the user removes the region dir */ 2182 spin_lock(&o2hb_live_lock); 2183 hb_task = reg->hr_task; 2184 reg->hr_task = NULL; 2185 reg->hr_item_dropped = 1; 2186 spin_unlock(&o2hb_live_lock); 2187 2188 if (hb_task) 2189 kthread_stop(hb_task); 2190 2191 if (o2hb_global_heartbeat_active()) { 2192 spin_lock(&o2hb_live_lock); 2193 clear_bit(reg->hr_region_num, o2hb_region_bitmap); 2194 clear_bit(reg->hr_region_num, o2hb_live_region_bitmap); 2195 if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap)) 2196 quorum_region = 1; 2197 clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap); 2198 spin_unlock(&o2hb_live_lock); 2199 printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n", 2200 ((atomic_read(®->hr_steady_iterations) == 0) ? 2201 "stopped" : "start aborted"), config_item_name(item), 2202 reg->hr_dev_name); 2203 } 2204 2205 /* 2206 * If we're racing a dev_write(), we need to wake them. They will 2207 * check reg->hr_task 2208 */ 2209 if (atomic_read(®->hr_steady_iterations) != 0) { 2210 reg->hr_aborted_start = 1; 2211 atomic_set(®->hr_steady_iterations, 0); 2212 wake_up(&o2hb_steady_queue); 2213 } 2214 2215 config_item_put(item); 2216 2217 if (!o2hb_global_heartbeat_active() || !quorum_region) 2218 return; 2219 2220 /* 2221 * If global heartbeat active and there are dependent users, 2222 * pin all regions if quorum region count <= CUT_OFF 2223 */ 2224 spin_lock(&o2hb_live_lock); 2225 2226 if (!o2hb_dependent_users) 2227 goto unlock; 2228 2229 if (bitmap_weight(o2hb_quorum_region_bitmap, 2230 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) 2231 o2hb_region_pin(NULL); 2232 2233 unlock: 2234 spin_unlock(&o2hb_live_lock); 2235 } 2236 2237 static ssize_t o2hb_heartbeat_group_dead_threshold_show(struct config_item *item, 2238 char *page) 2239 { 2240 return sprintf(page, "%u\n", o2hb_dead_threshold); 2241 } 2242 2243 static ssize_t o2hb_heartbeat_group_dead_threshold_store(struct config_item *item, 2244 const char *page, size_t count) 2245 { 2246 unsigned long tmp; 2247 char *p = (char *)page; 2248 2249 tmp = simple_strtoul(p, &p, 10); 2250 if (!p || (*p && (*p != '\n'))) 2251 return -EINVAL; 2252 2253 /* this will validate ranges for us. */ 2254 o2hb_dead_threshold_set((unsigned int) tmp); 2255 2256 return count; 2257 } 2258 2259 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item, 2260 char *page) 2261 { 2262 return sprintf(page, "%s\n", 2263 o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]); 2264 } 2265 2266 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item, 2267 const char *page, size_t count) 2268 { 2269 unsigned int i; 2270 int ret; 2271 size_t len; 2272 2273 len = (page[count - 1] == '\n') ? count - 1 : count; 2274 if (!len) 2275 return -EINVAL; 2276 2277 for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) { 2278 if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len)) 2279 continue; 2280 2281 ret = o2hb_global_heartbeat_mode_set(i); 2282 if (!ret) 2283 printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n", 2284 o2hb_heartbeat_mode_desc[i]); 2285 return count; 2286 } 2287 2288 return -EINVAL; 2289 2290 } 2291 2292 CONFIGFS_ATTR(o2hb_heartbeat_group_, dead_threshold); 2293 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode); 2294 2295 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = { 2296 &o2hb_heartbeat_group_attr_dead_threshold, 2297 &o2hb_heartbeat_group_attr_mode, 2298 NULL, 2299 }; 2300 2301 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = { 2302 .make_item = o2hb_heartbeat_group_make_item, 2303 .drop_item = o2hb_heartbeat_group_drop_item, 2304 }; 2305 2306 static const struct config_item_type o2hb_heartbeat_group_type = { 2307 .ct_group_ops = &o2hb_heartbeat_group_group_ops, 2308 .ct_attrs = o2hb_heartbeat_group_attrs, 2309 .ct_owner = THIS_MODULE, 2310 }; 2311 2312 /* this is just here to avoid touching group in heartbeat.h which the 2313 * entire damn world #includes */ 2314 struct config_group *o2hb_alloc_hb_set(void) 2315 { 2316 struct o2hb_heartbeat_group *hs = NULL; 2317 struct config_group *ret = NULL; 2318 2319 hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL); 2320 if (hs == NULL) 2321 goto out; 2322 2323 config_group_init_type_name(&hs->hs_group, "heartbeat", 2324 &o2hb_heartbeat_group_type); 2325 2326 ret = &hs->hs_group; 2327 out: 2328 if (ret == NULL) 2329 kfree(hs); 2330 return ret; 2331 } 2332 2333 void o2hb_free_hb_set(struct config_group *group) 2334 { 2335 struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group); 2336 kfree(hs); 2337 } 2338 2339 /* hb callback registration and issuing */ 2340 2341 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type) 2342 { 2343 if (type == O2HB_NUM_CB) 2344 return ERR_PTR(-EINVAL); 2345 2346 return &o2hb_callbacks[type]; 2347 } 2348 2349 void o2hb_setup_callback(struct o2hb_callback_func *hc, 2350 enum o2hb_callback_type type, 2351 o2hb_cb_func *func, 2352 void *data, 2353 int priority) 2354 { 2355 INIT_LIST_HEAD(&hc->hc_item); 2356 hc->hc_func = func; 2357 hc->hc_data = data; 2358 hc->hc_priority = priority; 2359 hc->hc_type = type; 2360 hc->hc_magic = O2HB_CB_MAGIC; 2361 } 2362 EXPORT_SYMBOL_GPL(o2hb_setup_callback); 2363 2364 /* 2365 * In local heartbeat mode, region_uuid passed matches the dlm domain name. 2366 * In global heartbeat mode, region_uuid passed is NULL. 2367 * 2368 * In local, we only pin the matching region. In global we pin all the active 2369 * regions. 2370 */ 2371 static int o2hb_region_pin(const char *region_uuid) 2372 { 2373 int ret = 0, found = 0; 2374 struct o2hb_region *reg; 2375 char *uuid; 2376 2377 assert_spin_locked(&o2hb_live_lock); 2378 2379 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2380 if (reg->hr_item_dropped) 2381 continue; 2382 2383 uuid = config_item_name(®->hr_item); 2384 2385 /* local heartbeat */ 2386 if (region_uuid) { 2387 if (strcmp(region_uuid, uuid)) 2388 continue; 2389 found = 1; 2390 } 2391 2392 if (reg->hr_item_pinned || reg->hr_item_dropped) 2393 goto skip_pin; 2394 2395 /* Ignore ENOENT only for local hb (userdlm domain) */ 2396 ret = o2nm_depend_item(®->hr_item); 2397 if (!ret) { 2398 mlog(ML_CLUSTER, "Pin region %s\n", uuid); 2399 reg->hr_item_pinned = 1; 2400 } else { 2401 if (ret == -ENOENT && found) 2402 ret = 0; 2403 else { 2404 mlog(ML_ERROR, "Pin region %s fails with %d\n", 2405 uuid, ret); 2406 break; 2407 } 2408 } 2409 skip_pin: 2410 if (found) 2411 break; 2412 } 2413 2414 return ret; 2415 } 2416 2417 /* 2418 * In local heartbeat mode, region_uuid passed matches the dlm domain name. 2419 * In global heartbeat mode, region_uuid passed is NULL. 2420 * 2421 * In local, we only unpin the matching region. In global we unpin all the 2422 * active regions. 2423 */ 2424 static void o2hb_region_unpin(const char *region_uuid) 2425 { 2426 struct o2hb_region *reg; 2427 char *uuid; 2428 int found = 0; 2429 2430 assert_spin_locked(&o2hb_live_lock); 2431 2432 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2433 if (reg->hr_item_dropped) 2434 continue; 2435 2436 uuid = config_item_name(®->hr_item); 2437 if (region_uuid) { 2438 if (strcmp(region_uuid, uuid)) 2439 continue; 2440 found = 1; 2441 } 2442 2443 if (reg->hr_item_pinned) { 2444 mlog(ML_CLUSTER, "Unpin region %s\n", uuid); 2445 o2nm_undepend_item(®->hr_item); 2446 reg->hr_item_pinned = 0; 2447 } 2448 if (found) 2449 break; 2450 } 2451 } 2452 2453 static int o2hb_region_inc_user(const char *region_uuid) 2454 { 2455 int ret = 0; 2456 2457 spin_lock(&o2hb_live_lock); 2458 2459 /* local heartbeat */ 2460 if (!o2hb_global_heartbeat_active()) { 2461 ret = o2hb_region_pin(region_uuid); 2462 goto unlock; 2463 } 2464 2465 /* 2466 * if global heartbeat active and this is the first dependent user, 2467 * pin all regions if quorum region count <= CUT_OFF 2468 */ 2469 o2hb_dependent_users++; 2470 if (o2hb_dependent_users > 1) 2471 goto unlock; 2472 2473 if (bitmap_weight(o2hb_quorum_region_bitmap, 2474 O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF) 2475 ret = o2hb_region_pin(NULL); 2476 2477 unlock: 2478 spin_unlock(&o2hb_live_lock); 2479 return ret; 2480 } 2481 2482 static void o2hb_region_dec_user(const char *region_uuid) 2483 { 2484 spin_lock(&o2hb_live_lock); 2485 2486 /* local heartbeat */ 2487 if (!o2hb_global_heartbeat_active()) { 2488 o2hb_region_unpin(region_uuid); 2489 goto unlock; 2490 } 2491 2492 /* 2493 * if global heartbeat active and there are no dependent users, 2494 * unpin all quorum regions 2495 */ 2496 o2hb_dependent_users--; 2497 if (!o2hb_dependent_users) 2498 o2hb_region_unpin(NULL); 2499 2500 unlock: 2501 spin_unlock(&o2hb_live_lock); 2502 } 2503 2504 int o2hb_register_callback(const char *region_uuid, 2505 struct o2hb_callback_func *hc) 2506 { 2507 struct o2hb_callback_func *f; 2508 struct o2hb_callback *hbcall; 2509 int ret; 2510 2511 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 2512 BUG_ON(!list_empty(&hc->hc_item)); 2513 2514 hbcall = hbcall_from_type(hc->hc_type); 2515 if (IS_ERR(hbcall)) { 2516 ret = PTR_ERR(hbcall); 2517 goto out; 2518 } 2519 2520 if (region_uuid) { 2521 ret = o2hb_region_inc_user(region_uuid); 2522 if (ret) { 2523 mlog_errno(ret); 2524 goto out; 2525 } 2526 } 2527 2528 down_write(&o2hb_callback_sem); 2529 2530 list_for_each_entry(f, &hbcall->list, hc_item) { 2531 if (hc->hc_priority < f->hc_priority) { 2532 list_add_tail(&hc->hc_item, &f->hc_item); 2533 break; 2534 } 2535 } 2536 if (list_empty(&hc->hc_item)) 2537 list_add_tail(&hc->hc_item, &hbcall->list); 2538 2539 up_write(&o2hb_callback_sem); 2540 ret = 0; 2541 out: 2542 mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n", 2543 ret, __builtin_return_address(0), hc); 2544 return ret; 2545 } 2546 EXPORT_SYMBOL_GPL(o2hb_register_callback); 2547 2548 void o2hb_unregister_callback(const char *region_uuid, 2549 struct o2hb_callback_func *hc) 2550 { 2551 BUG_ON(hc->hc_magic != O2HB_CB_MAGIC); 2552 2553 mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n", 2554 __builtin_return_address(0), hc); 2555 2556 /* XXX Can this happen _with_ a region reference? */ 2557 if (list_empty(&hc->hc_item)) 2558 return; 2559 2560 if (region_uuid) 2561 o2hb_region_dec_user(region_uuid); 2562 2563 down_write(&o2hb_callback_sem); 2564 2565 list_del_init(&hc->hc_item); 2566 2567 up_write(&o2hb_callback_sem); 2568 } 2569 EXPORT_SYMBOL_GPL(o2hb_unregister_callback); 2570 2571 int o2hb_check_node_heartbeating_no_sem(u8 node_num) 2572 { 2573 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 2574 2575 spin_lock(&o2hb_live_lock); 2576 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 2577 spin_unlock(&o2hb_live_lock); 2578 if (!test_bit(node_num, testing_map)) { 2579 mlog(ML_HEARTBEAT, 2580 "node (%u) does not have heartbeating enabled.\n", 2581 node_num); 2582 return 0; 2583 } 2584 2585 return 1; 2586 } 2587 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem); 2588 2589 int o2hb_check_node_heartbeating_from_callback(u8 node_num) 2590 { 2591 unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)]; 2592 2593 o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map)); 2594 if (!test_bit(node_num, testing_map)) { 2595 mlog(ML_HEARTBEAT, 2596 "node (%u) does not have heartbeating enabled.\n", 2597 node_num); 2598 return 0; 2599 } 2600 2601 return 1; 2602 } 2603 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback); 2604 2605 /* 2606 * this is just a hack until we get the plumbing which flips file systems 2607 * read only and drops the hb ref instead of killing the node dead. 2608 */ 2609 void o2hb_stop_all_regions(void) 2610 { 2611 struct o2hb_region *reg; 2612 2613 mlog(ML_ERROR, "stopping heartbeat on all active regions.\n"); 2614 2615 spin_lock(&o2hb_live_lock); 2616 2617 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) 2618 reg->hr_unclean_stop = 1; 2619 2620 spin_unlock(&o2hb_live_lock); 2621 } 2622 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions); 2623 2624 int o2hb_get_all_regions(char *region_uuids, u8 max_regions) 2625 { 2626 struct o2hb_region *reg; 2627 int numregs = 0; 2628 char *p; 2629 2630 spin_lock(&o2hb_live_lock); 2631 2632 p = region_uuids; 2633 list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) { 2634 if (reg->hr_item_dropped) 2635 continue; 2636 2637 mlog(0, "Region: %s\n", config_item_name(®->hr_item)); 2638 if (numregs < max_regions) { 2639 memcpy(p, config_item_name(®->hr_item), 2640 O2HB_MAX_REGION_NAME_LEN); 2641 p += O2HB_MAX_REGION_NAME_LEN; 2642 } 2643 numregs++; 2644 } 2645 2646 spin_unlock(&o2hb_live_lock); 2647 2648 return numregs; 2649 } 2650 EXPORT_SYMBOL_GPL(o2hb_get_all_regions); 2651 2652 int o2hb_global_heartbeat_active(void) 2653 { 2654 return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL); 2655 } 2656 EXPORT_SYMBOL(o2hb_global_heartbeat_active); 2657