xref: /linux/fs/ocfs2/cluster/heartbeat.c (revision 0883c2c06fb5bcf5b9e008270827e63c09a88c1e)
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21 
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36 #include <linux/debugfs.h>
37 #include <linux/slab.h>
38 #include <linux/bitmap.h>
39 #include <linux/ktime.h>
40 #include "heartbeat.h"
41 #include "tcp.h"
42 #include "nodemanager.h"
43 #include "quorum.h"
44 
45 #include "masklog.h"
46 
47 
48 /*
49  * The first heartbeat pass had one global thread that would serialize all hb
50  * callback calls.  This global serializing sem should only be removed once
51  * we've made sure that all callees can deal with being called concurrently
52  * from multiple hb region threads.
53  */
54 static DECLARE_RWSEM(o2hb_callback_sem);
55 
56 /*
57  * multiple hb threads are watching multiple regions.  A node is live
58  * whenever any of the threads sees activity from the node in its region.
59  */
60 static DEFINE_SPINLOCK(o2hb_live_lock);
61 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
62 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
63 static LIST_HEAD(o2hb_node_events);
64 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
65 
66 /*
67  * In global heartbeat, we maintain a series of region bitmaps.
68  * 	- o2hb_region_bitmap allows us to limit the region number to max region.
69  * 	- o2hb_live_region_bitmap tracks live regions (seen steady iterations).
70  * 	- o2hb_quorum_region_bitmap tracks live regions that have seen all nodes
71  * 		heartbeat on it.
72  * 	- o2hb_failed_region_bitmap tracks the regions that have seen io timeouts.
73  */
74 static unsigned long o2hb_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
75 static unsigned long o2hb_live_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
76 static unsigned long o2hb_quorum_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
77 static unsigned long o2hb_failed_region_bitmap[BITS_TO_LONGS(O2NM_MAX_REGIONS)];
78 
79 #define O2HB_DB_TYPE_LIVENODES		0
80 #define O2HB_DB_TYPE_LIVEREGIONS	1
81 #define O2HB_DB_TYPE_QUORUMREGIONS	2
82 #define O2HB_DB_TYPE_FAILEDREGIONS	3
83 #define O2HB_DB_TYPE_REGION_LIVENODES	4
84 #define O2HB_DB_TYPE_REGION_NUMBER	5
85 #define O2HB_DB_TYPE_REGION_ELAPSED_TIME	6
86 #define O2HB_DB_TYPE_REGION_PINNED	7
87 struct o2hb_debug_buf {
88 	int db_type;
89 	int db_size;
90 	int db_len;
91 	void *db_data;
92 };
93 
94 static struct o2hb_debug_buf *o2hb_db_livenodes;
95 static struct o2hb_debug_buf *o2hb_db_liveregions;
96 static struct o2hb_debug_buf *o2hb_db_quorumregions;
97 static struct o2hb_debug_buf *o2hb_db_failedregions;
98 
99 #define O2HB_DEBUG_DIR			"o2hb"
100 #define O2HB_DEBUG_LIVENODES		"livenodes"
101 #define O2HB_DEBUG_LIVEREGIONS		"live_regions"
102 #define O2HB_DEBUG_QUORUMREGIONS	"quorum_regions"
103 #define O2HB_DEBUG_FAILEDREGIONS	"failed_regions"
104 #define O2HB_DEBUG_REGION_NUMBER	"num"
105 #define O2HB_DEBUG_REGION_ELAPSED_TIME	"elapsed_time_in_ms"
106 #define O2HB_DEBUG_REGION_PINNED	"pinned"
107 
108 static struct dentry *o2hb_debug_dir;
109 static struct dentry *o2hb_debug_livenodes;
110 static struct dentry *o2hb_debug_liveregions;
111 static struct dentry *o2hb_debug_quorumregions;
112 static struct dentry *o2hb_debug_failedregions;
113 
114 static LIST_HEAD(o2hb_all_regions);
115 
116 static struct o2hb_callback {
117 	struct list_head list;
118 } o2hb_callbacks[O2HB_NUM_CB];
119 
120 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
121 
122 #define O2HB_DEFAULT_BLOCK_BITS       9
123 
124 enum o2hb_heartbeat_modes {
125 	O2HB_HEARTBEAT_LOCAL		= 0,
126 	O2HB_HEARTBEAT_GLOBAL,
127 	O2HB_HEARTBEAT_NUM_MODES,
128 };
129 
130 char *o2hb_heartbeat_mode_desc[O2HB_HEARTBEAT_NUM_MODES] = {
131 		"local",	/* O2HB_HEARTBEAT_LOCAL */
132 		"global",	/* O2HB_HEARTBEAT_GLOBAL */
133 };
134 
135 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
136 unsigned int o2hb_heartbeat_mode = O2HB_HEARTBEAT_LOCAL;
137 
138 /*
139  * o2hb_dependent_users tracks the number of registered callbacks that depend
140  * on heartbeat. o2net and o2dlm are two entities that register this callback.
141  * However only o2dlm depends on the heartbeat. It does not want the heartbeat
142  * to stop while a dlm domain is still active.
143  */
144 unsigned int o2hb_dependent_users;
145 
146 /*
147  * In global heartbeat mode, all regions are pinned if there are one or more
148  * dependent users and the quorum region count is <= O2HB_PIN_CUT_OFF. All
149  * regions are unpinned if the region count exceeds the cut off or the number
150  * of dependent users falls to zero.
151  */
152 #define O2HB_PIN_CUT_OFF		3
153 
154 /*
155  * In local heartbeat mode, we assume the dlm domain name to be the same as
156  * region uuid. This is true for domains created for the file system but not
157  * necessarily true for userdlm domains. This is a known limitation.
158  *
159  * In global heartbeat mode, we pin/unpin all o2hb regions. This solution
160  * works for both file system and userdlm domains.
161  */
162 static int o2hb_region_pin(const char *region_uuid);
163 static void o2hb_region_unpin(const char *region_uuid);
164 
165 /* Only sets a new threshold if there are no active regions.
166  *
167  * No locking or otherwise interesting code is required for reading
168  * o2hb_dead_threshold as it can't change once regions are active and
169  * it's not interesting to anyone until then anyway. */
170 static void o2hb_dead_threshold_set(unsigned int threshold)
171 {
172 	if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
173 		spin_lock(&o2hb_live_lock);
174 		if (list_empty(&o2hb_all_regions))
175 			o2hb_dead_threshold = threshold;
176 		spin_unlock(&o2hb_live_lock);
177 	}
178 }
179 
180 static int o2hb_global_heartbeat_mode_set(unsigned int hb_mode)
181 {
182 	int ret = -1;
183 
184 	if (hb_mode < O2HB_HEARTBEAT_NUM_MODES) {
185 		spin_lock(&o2hb_live_lock);
186 		if (list_empty(&o2hb_all_regions)) {
187 			o2hb_heartbeat_mode = hb_mode;
188 			ret = 0;
189 		}
190 		spin_unlock(&o2hb_live_lock);
191 	}
192 
193 	return ret;
194 }
195 
196 struct o2hb_node_event {
197 	struct list_head        hn_item;
198 	enum o2hb_callback_type hn_event_type;
199 	struct o2nm_node        *hn_node;
200 	int                     hn_node_num;
201 };
202 
203 struct o2hb_disk_slot {
204 	struct o2hb_disk_heartbeat_block *ds_raw_block;
205 	u8			ds_node_num;
206 	u64			ds_last_time;
207 	u64			ds_last_generation;
208 	u16			ds_equal_samples;
209 	u16			ds_changed_samples;
210 	struct list_head	ds_live_item;
211 };
212 
213 /* each thread owns a region.. when we're asked to tear down the region
214  * we ask the thread to stop, who cleans up the region */
215 struct o2hb_region {
216 	struct config_item	hr_item;
217 
218 	struct list_head	hr_all_item;
219 	unsigned		hr_unclean_stop:1,
220 				hr_aborted_start:1,
221 				hr_item_pinned:1,
222 				hr_item_dropped:1,
223 				hr_node_deleted:1;
224 
225 	/* protected by the hr_callback_sem */
226 	struct task_struct 	*hr_task;
227 
228 	unsigned int		hr_blocks;
229 	unsigned long long	hr_start_block;
230 
231 	unsigned int		hr_block_bits;
232 	unsigned int		hr_block_bytes;
233 
234 	unsigned int		hr_slots_per_page;
235 	unsigned int		hr_num_pages;
236 
237 	struct page             **hr_slot_data;
238 	struct block_device	*hr_bdev;
239 	struct o2hb_disk_slot	*hr_slots;
240 
241 	/* live node map of this region */
242 	unsigned long		hr_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
243 	unsigned int		hr_region_num;
244 
245 	struct dentry		*hr_debug_dir;
246 	struct dentry		*hr_debug_livenodes;
247 	struct dentry		*hr_debug_regnum;
248 	struct dentry		*hr_debug_elapsed_time;
249 	struct dentry		*hr_debug_pinned;
250 	struct o2hb_debug_buf	*hr_db_livenodes;
251 	struct o2hb_debug_buf	*hr_db_regnum;
252 	struct o2hb_debug_buf	*hr_db_elapsed_time;
253 	struct o2hb_debug_buf	*hr_db_pinned;
254 
255 	/* let the person setting up hb wait for it to return until it
256 	 * has reached a 'steady' state.  This will be fixed when we have
257 	 * a more complete api that doesn't lead to this sort of fragility. */
258 	atomic_t		hr_steady_iterations;
259 
260 	/* terminate o2hb thread if it does not reach steady state
261 	 * (hr_steady_iterations == 0) within hr_unsteady_iterations */
262 	atomic_t		hr_unsteady_iterations;
263 
264 	char			hr_dev_name[BDEVNAME_SIZE];
265 
266 	unsigned int		hr_timeout_ms;
267 
268 	/* randomized as the region goes up and down so that a node
269 	 * recognizes a node going up and down in one iteration */
270 	u64			hr_generation;
271 
272 	struct delayed_work	hr_write_timeout_work;
273 	unsigned long		hr_last_timeout_start;
274 
275 	/* negotiate timer, used to negotiate extending hb timeout. */
276 	struct delayed_work	hr_nego_timeout_work;
277 	unsigned long		hr_nego_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
278 
279 	/* Used during o2hb_check_slot to hold a copy of the block
280 	 * being checked because we temporarily have to zero out the
281 	 * crc field. */
282 	struct o2hb_disk_heartbeat_block *hr_tmp_block;
283 
284 	/* Message key for negotiate timeout message. */
285 	unsigned int		hr_key;
286 	struct list_head	hr_handler_list;
287 
288 	/* last hb status, 0 for success, other value for error. */
289 	int			hr_last_hb_status;
290 };
291 
292 struct o2hb_bio_wait_ctxt {
293 	atomic_t          wc_num_reqs;
294 	struct completion wc_io_complete;
295 	int               wc_error;
296 };
297 
298 #define O2HB_NEGO_TIMEOUT_MS (O2HB_MAX_WRITE_TIMEOUT_MS/2)
299 
300 enum {
301 	O2HB_NEGO_TIMEOUT_MSG = 1,
302 	O2HB_NEGO_APPROVE_MSG = 2,
303 };
304 
305 struct o2hb_nego_msg {
306 	u8 node_num;
307 };
308 
309 static void o2hb_write_timeout(struct work_struct *work)
310 {
311 	int failed, quorum;
312 	struct o2hb_region *reg =
313 		container_of(work, struct o2hb_region,
314 			     hr_write_timeout_work.work);
315 
316 	mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
317 	     "milliseconds\n", reg->hr_dev_name,
318 	     jiffies_to_msecs(jiffies - reg->hr_last_timeout_start));
319 
320 	if (o2hb_global_heartbeat_active()) {
321 		spin_lock(&o2hb_live_lock);
322 		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
323 			set_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
324 		failed = bitmap_weight(o2hb_failed_region_bitmap,
325 					O2NM_MAX_REGIONS);
326 		quorum = bitmap_weight(o2hb_quorum_region_bitmap,
327 					O2NM_MAX_REGIONS);
328 		spin_unlock(&o2hb_live_lock);
329 
330 		mlog(ML_HEARTBEAT, "Number of regions %d, failed regions %d\n",
331 		     quorum, failed);
332 
333 		/*
334 		 * Fence if the number of failed regions >= half the number
335 		 * of  quorum regions
336 		 */
337 		if ((failed << 1) < quorum)
338 			return;
339 	}
340 
341 	o2quo_disk_timeout();
342 }
343 
344 static void o2hb_arm_timeout(struct o2hb_region *reg)
345 {
346 	/* Arm writeout only after thread reaches steady state */
347 	if (atomic_read(&reg->hr_steady_iterations) != 0)
348 		return;
349 
350 	mlog(ML_HEARTBEAT, "Queue write timeout for %u ms\n",
351 	     O2HB_MAX_WRITE_TIMEOUT_MS);
352 
353 	if (o2hb_global_heartbeat_active()) {
354 		spin_lock(&o2hb_live_lock);
355 		clear_bit(reg->hr_region_num, o2hb_failed_region_bitmap);
356 		spin_unlock(&o2hb_live_lock);
357 	}
358 	cancel_delayed_work(&reg->hr_write_timeout_work);
359 	schedule_delayed_work(&reg->hr_write_timeout_work,
360 			      msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
361 
362 	cancel_delayed_work(&reg->hr_nego_timeout_work);
363 	/* negotiate timeout must be less than write timeout. */
364 	schedule_delayed_work(&reg->hr_nego_timeout_work,
365 			      msecs_to_jiffies(O2HB_NEGO_TIMEOUT_MS));
366 	memset(reg->hr_nego_node_bitmap, 0, sizeof(reg->hr_nego_node_bitmap));
367 }
368 
369 static void o2hb_disarm_timeout(struct o2hb_region *reg)
370 {
371 	cancel_delayed_work_sync(&reg->hr_write_timeout_work);
372 	cancel_delayed_work_sync(&reg->hr_nego_timeout_work);
373 }
374 
375 static int o2hb_send_nego_msg(int key, int type, u8 target)
376 {
377 	struct o2hb_nego_msg msg;
378 	int status, ret;
379 
380 	msg.node_num = o2nm_this_node();
381 again:
382 	ret = o2net_send_message(type, key, &msg, sizeof(msg),
383 			target, &status);
384 
385 	if (ret == -EAGAIN || ret == -ENOMEM) {
386 		msleep(100);
387 		goto again;
388 	}
389 
390 	return ret;
391 }
392 
393 static void o2hb_nego_timeout(struct work_struct *work)
394 {
395 	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
396 	int master_node, i, ret;
397 	struct o2hb_region *reg;
398 
399 	reg = container_of(work, struct o2hb_region, hr_nego_timeout_work.work);
400 	/* don't negotiate timeout if last hb failed since it is very
401 	 * possible io failed. Should let write timeout fence self.
402 	 */
403 	if (reg->hr_last_hb_status)
404 		return;
405 
406 	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
407 	/* lowest node as master node to make negotiate decision. */
408 	master_node = find_next_bit(live_node_bitmap, O2NM_MAX_NODES, 0);
409 
410 	if (master_node == o2nm_this_node()) {
411 		if (!test_bit(master_node, reg->hr_nego_node_bitmap)) {
412 			printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s).\n",
413 				o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000,
414 				config_item_name(&reg->hr_item), reg->hr_dev_name);
415 			set_bit(master_node, reg->hr_nego_node_bitmap);
416 		}
417 		if (memcmp(reg->hr_nego_node_bitmap, live_node_bitmap,
418 				sizeof(reg->hr_nego_node_bitmap))) {
419 			/* check negotiate bitmap every second to do timeout
420 			 * approve decision.
421 			 */
422 			schedule_delayed_work(&reg->hr_nego_timeout_work,
423 				msecs_to_jiffies(1000));
424 
425 			return;
426 		}
427 
428 		printk(KERN_NOTICE "o2hb: all nodes hb write hung, maybe region %s (%s) is down.\n",
429 			config_item_name(&reg->hr_item), reg->hr_dev_name);
430 		/* approve negotiate timeout request. */
431 		o2hb_arm_timeout(reg);
432 
433 		i = -1;
434 		while ((i = find_next_bit(live_node_bitmap,
435 				O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
436 			if (i == master_node)
437 				continue;
438 
439 			mlog(ML_HEARTBEAT, "send NEGO_APPROVE msg to node %d\n", i);
440 			ret = o2hb_send_nego_msg(reg->hr_key,
441 					O2HB_NEGO_APPROVE_MSG, i);
442 			if (ret)
443 				mlog(ML_ERROR, "send NEGO_APPROVE msg to node %d fail %d\n",
444 					i, ret);
445 		}
446 	} else {
447 		/* negotiate timeout with master node. */
448 		printk(KERN_NOTICE "o2hb: node %d hb write hung for %ds on region %s (%s), negotiate timeout with node %d.\n",
449 			o2nm_this_node(), O2HB_NEGO_TIMEOUT_MS/1000, config_item_name(&reg->hr_item),
450 			reg->hr_dev_name, master_node);
451 		ret = o2hb_send_nego_msg(reg->hr_key, O2HB_NEGO_TIMEOUT_MSG,
452 				master_node);
453 		if (ret)
454 			mlog(ML_ERROR, "send NEGO_TIMEOUT msg to node %d fail %d\n",
455 				master_node, ret);
456 	}
457 }
458 
459 static int o2hb_nego_timeout_handler(struct o2net_msg *msg, u32 len, void *data,
460 				void **ret_data)
461 {
462 	struct o2hb_region *reg = data;
463 	struct o2hb_nego_msg *nego_msg;
464 
465 	nego_msg = (struct o2hb_nego_msg *)msg->buf;
466 	printk(KERN_NOTICE "o2hb: receive negotiate timeout message from node %d on region %s (%s).\n",
467 		nego_msg->node_num, config_item_name(&reg->hr_item), reg->hr_dev_name);
468 	if (nego_msg->node_num < O2NM_MAX_NODES)
469 		set_bit(nego_msg->node_num, reg->hr_nego_node_bitmap);
470 	else
471 		mlog(ML_ERROR, "got nego timeout message from bad node.\n");
472 
473 	return 0;
474 }
475 
476 static int o2hb_nego_approve_handler(struct o2net_msg *msg, u32 len, void *data,
477 				void **ret_data)
478 {
479 	struct o2hb_region *reg = data;
480 
481 	printk(KERN_NOTICE "o2hb: negotiate timeout approved by master node on region %s (%s).\n",
482 		config_item_name(&reg->hr_item), reg->hr_dev_name);
483 	o2hb_arm_timeout(reg);
484 	return 0;
485 }
486 
487 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
488 {
489 	atomic_set(&wc->wc_num_reqs, 1);
490 	init_completion(&wc->wc_io_complete);
491 	wc->wc_error = 0;
492 }
493 
494 /* Used in error paths too */
495 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
496 				     unsigned int num)
497 {
498 	/* sadly atomic_sub_and_test() isn't available on all platforms.  The
499 	 * good news is that the fast path only completes one at a time */
500 	while(num--) {
501 		if (atomic_dec_and_test(&wc->wc_num_reqs)) {
502 			BUG_ON(num > 0);
503 			complete(&wc->wc_io_complete);
504 		}
505 	}
506 }
507 
508 static void o2hb_wait_on_io(struct o2hb_region *reg,
509 			    struct o2hb_bio_wait_ctxt *wc)
510 {
511 	o2hb_bio_wait_dec(wc, 1);
512 	wait_for_completion(&wc->wc_io_complete);
513 }
514 
515 static void o2hb_bio_end_io(struct bio *bio)
516 {
517 	struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
518 
519 	if (bio->bi_error) {
520 		mlog(ML_ERROR, "IO Error %d\n", bio->bi_error);
521 		wc->wc_error = bio->bi_error;
522 	}
523 
524 	o2hb_bio_wait_dec(wc, 1);
525 	bio_put(bio);
526 }
527 
528 /* Setup a Bio to cover I/O against num_slots slots starting at
529  * start_slot. */
530 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
531 				      struct o2hb_bio_wait_ctxt *wc,
532 				      unsigned int *current_slot,
533 				      unsigned int max_slots)
534 {
535 	int len, current_page;
536 	unsigned int vec_len, vec_start;
537 	unsigned int bits = reg->hr_block_bits;
538 	unsigned int spp = reg->hr_slots_per_page;
539 	unsigned int cs = *current_slot;
540 	struct bio *bio;
541 	struct page *page;
542 
543 	/* Testing has shown this allocation to take long enough under
544 	 * GFP_KERNEL that the local node can get fenced. It would be
545 	 * nicest if we could pre-allocate these bios and avoid this
546 	 * all together. */
547 	bio = bio_alloc(GFP_ATOMIC, 16);
548 	if (!bio) {
549 		mlog(ML_ERROR, "Could not alloc slots BIO!\n");
550 		bio = ERR_PTR(-ENOMEM);
551 		goto bail;
552 	}
553 
554 	/* Must put everything in 512 byte sectors for the bio... */
555 	bio->bi_iter.bi_sector = (reg->hr_start_block + cs) << (bits - 9);
556 	bio->bi_bdev = reg->hr_bdev;
557 	bio->bi_private = wc;
558 	bio->bi_end_io = o2hb_bio_end_io;
559 
560 	vec_start = (cs << bits) % PAGE_SIZE;
561 	while(cs < max_slots) {
562 		current_page = cs / spp;
563 		page = reg->hr_slot_data[current_page];
564 
565 		vec_len = min(PAGE_SIZE - vec_start,
566 			      (max_slots-cs) * (PAGE_SIZE/spp) );
567 
568 		mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
569 		     current_page, vec_len, vec_start);
570 
571 		len = bio_add_page(bio, page, vec_len, vec_start);
572 		if (len != vec_len) break;
573 
574 		cs += vec_len / (PAGE_SIZE/spp);
575 		vec_start = 0;
576 	}
577 
578 bail:
579 	*current_slot = cs;
580 	return bio;
581 }
582 
583 static int o2hb_read_slots(struct o2hb_region *reg,
584 			   unsigned int max_slots)
585 {
586 	unsigned int current_slot=0;
587 	int status;
588 	struct o2hb_bio_wait_ctxt wc;
589 	struct bio *bio;
590 
591 	o2hb_bio_wait_init(&wc);
592 
593 	while(current_slot < max_slots) {
594 		bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
595 		if (IS_ERR(bio)) {
596 			status = PTR_ERR(bio);
597 			mlog_errno(status);
598 			goto bail_and_wait;
599 		}
600 
601 		atomic_inc(&wc.wc_num_reqs);
602 		submit_bio(READ, bio);
603 	}
604 
605 	status = 0;
606 
607 bail_and_wait:
608 	o2hb_wait_on_io(reg, &wc);
609 	if (wc.wc_error && !status)
610 		status = wc.wc_error;
611 
612 	return status;
613 }
614 
615 static int o2hb_issue_node_write(struct o2hb_region *reg,
616 				 struct o2hb_bio_wait_ctxt *write_wc)
617 {
618 	int status;
619 	unsigned int slot;
620 	struct bio *bio;
621 
622 	o2hb_bio_wait_init(write_wc);
623 
624 	slot = o2nm_this_node();
625 
626 	bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
627 	if (IS_ERR(bio)) {
628 		status = PTR_ERR(bio);
629 		mlog_errno(status);
630 		goto bail;
631 	}
632 
633 	atomic_inc(&write_wc->wc_num_reqs);
634 	submit_bio(WRITE_SYNC, bio);
635 
636 	status = 0;
637 bail:
638 	return status;
639 }
640 
641 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
642 				     struct o2hb_disk_heartbeat_block *hb_block)
643 {
644 	__le32 old_cksum;
645 	u32 ret;
646 
647 	/* We want to compute the block crc with a 0 value in the
648 	 * hb_cksum field. Save it off here and replace after the
649 	 * crc. */
650 	old_cksum = hb_block->hb_cksum;
651 	hb_block->hb_cksum = 0;
652 
653 	ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
654 
655 	hb_block->hb_cksum = old_cksum;
656 
657 	return ret;
658 }
659 
660 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
661 {
662 	mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
663 	     "cksum = 0x%x, generation 0x%llx\n",
664 	     (long long)le64_to_cpu(hb_block->hb_seq),
665 	     hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
666 	     (long long)le64_to_cpu(hb_block->hb_generation));
667 }
668 
669 static int o2hb_verify_crc(struct o2hb_region *reg,
670 			   struct o2hb_disk_heartbeat_block *hb_block)
671 {
672 	u32 read, computed;
673 
674 	read = le32_to_cpu(hb_block->hb_cksum);
675 	computed = o2hb_compute_block_crc_le(reg, hb_block);
676 
677 	return read == computed;
678 }
679 
680 /*
681  * Compare the slot data with what we wrote in the last iteration.
682  * If the match fails, print an appropriate error message. This is to
683  * detect errors like... another node hearting on the same slot,
684  * flaky device that is losing writes, etc.
685  * Returns 1 if check succeeds, 0 otherwise.
686  */
687 static int o2hb_check_own_slot(struct o2hb_region *reg)
688 {
689 	struct o2hb_disk_slot *slot;
690 	struct o2hb_disk_heartbeat_block *hb_block;
691 	char *errstr;
692 
693 	slot = &reg->hr_slots[o2nm_this_node()];
694 	/* Don't check on our 1st timestamp */
695 	if (!slot->ds_last_time)
696 		return 0;
697 
698 	hb_block = slot->ds_raw_block;
699 	if (le64_to_cpu(hb_block->hb_seq) == slot->ds_last_time &&
700 	    le64_to_cpu(hb_block->hb_generation) == slot->ds_last_generation &&
701 	    hb_block->hb_node == slot->ds_node_num)
702 		return 1;
703 
704 #define ERRSTR1		"Another node is heartbeating on device"
705 #define ERRSTR2		"Heartbeat generation mismatch on device"
706 #define ERRSTR3		"Heartbeat sequence mismatch on device"
707 
708 	if (hb_block->hb_node != slot->ds_node_num)
709 		errstr = ERRSTR1;
710 	else if (le64_to_cpu(hb_block->hb_generation) !=
711 		 slot->ds_last_generation)
712 		errstr = ERRSTR2;
713 	else
714 		errstr = ERRSTR3;
715 
716 	mlog(ML_ERROR, "%s (%s): expected(%u:0x%llx, 0x%llx), "
717 	     "ondisk(%u:0x%llx, 0x%llx)\n", errstr, reg->hr_dev_name,
718 	     slot->ds_node_num, (unsigned long long)slot->ds_last_generation,
719 	     (unsigned long long)slot->ds_last_time, hb_block->hb_node,
720 	     (unsigned long long)le64_to_cpu(hb_block->hb_generation),
721 	     (unsigned long long)le64_to_cpu(hb_block->hb_seq));
722 
723 	return 0;
724 }
725 
726 static inline void o2hb_prepare_block(struct o2hb_region *reg,
727 				      u64 generation)
728 {
729 	int node_num;
730 	u64 cputime;
731 	struct o2hb_disk_slot *slot;
732 	struct o2hb_disk_heartbeat_block *hb_block;
733 
734 	node_num = o2nm_this_node();
735 	slot = &reg->hr_slots[node_num];
736 
737 	hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
738 	memset(hb_block, 0, reg->hr_block_bytes);
739 	/* TODO: time stuff */
740 	cputime = CURRENT_TIME.tv_sec;
741 	if (!cputime)
742 		cputime = 1;
743 
744 	hb_block->hb_seq = cpu_to_le64(cputime);
745 	hb_block->hb_node = node_num;
746 	hb_block->hb_generation = cpu_to_le64(generation);
747 	hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
748 
749 	/* This step must always happen last! */
750 	hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
751 								   hb_block));
752 
753 	mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
754 	     (long long)generation,
755 	     le32_to_cpu(hb_block->hb_cksum));
756 }
757 
758 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
759 				struct o2nm_node *node,
760 				int idx)
761 {
762 	struct o2hb_callback_func *f;
763 
764 	list_for_each_entry(f, &hbcall->list, hc_item) {
765 		mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
766 		(f->hc_func)(node, idx, f->hc_data);
767 	}
768 }
769 
770 /* Will run the list in order until we process the passed event */
771 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
772 {
773 	struct o2hb_callback *hbcall;
774 	struct o2hb_node_event *event;
775 
776 	/* Holding callback sem assures we don't alter the callback
777 	 * lists when doing this, and serializes ourselves with other
778 	 * processes wanting callbacks. */
779 	down_write(&o2hb_callback_sem);
780 
781 	spin_lock(&o2hb_live_lock);
782 	while (!list_empty(&o2hb_node_events)
783 	       && !list_empty(&queued_event->hn_item)) {
784 		event = list_entry(o2hb_node_events.next,
785 				   struct o2hb_node_event,
786 				   hn_item);
787 		list_del_init(&event->hn_item);
788 		spin_unlock(&o2hb_live_lock);
789 
790 		mlog(ML_HEARTBEAT, "Node %s event for %d\n",
791 		     event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
792 		     event->hn_node_num);
793 
794 		hbcall = hbcall_from_type(event->hn_event_type);
795 
796 		/* We should *never* have gotten on to the list with a
797 		 * bad type... This isn't something that we should try
798 		 * to recover from. */
799 		BUG_ON(IS_ERR(hbcall));
800 
801 		o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
802 
803 		spin_lock(&o2hb_live_lock);
804 	}
805 	spin_unlock(&o2hb_live_lock);
806 
807 	up_write(&o2hb_callback_sem);
808 }
809 
810 static void o2hb_queue_node_event(struct o2hb_node_event *event,
811 				  enum o2hb_callback_type type,
812 				  struct o2nm_node *node,
813 				  int node_num)
814 {
815 	assert_spin_locked(&o2hb_live_lock);
816 
817 	BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
818 
819 	event->hn_event_type = type;
820 	event->hn_node = node;
821 	event->hn_node_num = node_num;
822 
823 	mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
824 	     type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
825 
826 	list_add_tail(&event->hn_item, &o2hb_node_events);
827 }
828 
829 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
830 {
831 	struct o2hb_node_event event =
832 		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
833 	struct o2nm_node *node;
834 	int queued = 0;
835 
836 	node = o2nm_get_node_by_num(slot->ds_node_num);
837 	if (!node)
838 		return;
839 
840 	spin_lock(&o2hb_live_lock);
841 	if (!list_empty(&slot->ds_live_item)) {
842 		mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
843 		     slot->ds_node_num);
844 
845 		list_del_init(&slot->ds_live_item);
846 
847 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
848 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
849 
850 			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
851 					      slot->ds_node_num);
852 			queued = 1;
853 		}
854 	}
855 	spin_unlock(&o2hb_live_lock);
856 
857 	if (queued)
858 		o2hb_run_event_list(&event);
859 
860 	o2nm_node_put(node);
861 }
862 
863 static void o2hb_set_quorum_device(struct o2hb_region *reg)
864 {
865 	if (!o2hb_global_heartbeat_active())
866 		return;
867 
868 	/* Prevent race with o2hb_heartbeat_group_drop_item() */
869 	if (kthread_should_stop())
870 		return;
871 
872 	/* Tag region as quorum only after thread reaches steady state */
873 	if (atomic_read(&reg->hr_steady_iterations) != 0)
874 		return;
875 
876 	spin_lock(&o2hb_live_lock);
877 
878 	if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
879 		goto unlock;
880 
881 	/*
882 	 * A region can be added to the quorum only when it sees all
883 	 * live nodes heartbeat on it. In other words, the region has been
884 	 * added to all nodes.
885 	 */
886 	if (memcmp(reg->hr_live_node_bitmap, o2hb_live_node_bitmap,
887 		   sizeof(o2hb_live_node_bitmap)))
888 		goto unlock;
889 
890 	printk(KERN_NOTICE "o2hb: Region %s (%s) is now a quorum device\n",
891 	       config_item_name(&reg->hr_item), reg->hr_dev_name);
892 
893 	set_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
894 
895 	/*
896 	 * If global heartbeat active, unpin all regions if the
897 	 * region count > CUT_OFF
898 	 */
899 	if (bitmap_weight(o2hb_quorum_region_bitmap,
900 			   O2NM_MAX_REGIONS) > O2HB_PIN_CUT_OFF)
901 		o2hb_region_unpin(NULL);
902 unlock:
903 	spin_unlock(&o2hb_live_lock);
904 }
905 
906 static int o2hb_check_slot(struct o2hb_region *reg,
907 			   struct o2hb_disk_slot *slot)
908 {
909 	int changed = 0, gen_changed = 0;
910 	struct o2hb_node_event event =
911 		{ .hn_item = LIST_HEAD_INIT(event.hn_item), };
912 	struct o2nm_node *node;
913 	struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
914 	u64 cputime;
915 	unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
916 	unsigned int slot_dead_ms;
917 	int tmp;
918 	int queued = 0;
919 
920 	memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
921 
922 	/*
923 	 * If a node is no longer configured but is still in the livemap, we
924 	 * may need to clear that bit from the livemap.
925 	 */
926 	node = o2nm_get_node_by_num(slot->ds_node_num);
927 	if (!node) {
928 		spin_lock(&o2hb_live_lock);
929 		tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
930 		spin_unlock(&o2hb_live_lock);
931 		if (!tmp)
932 			return 0;
933 	}
934 
935 	if (!o2hb_verify_crc(reg, hb_block)) {
936 		/* all paths from here will drop o2hb_live_lock for
937 		 * us. */
938 		spin_lock(&o2hb_live_lock);
939 
940 		/* Don't print an error on the console in this case -
941 		 * a freshly formatted heartbeat area will not have a
942 		 * crc set on it. */
943 		if (list_empty(&slot->ds_live_item))
944 			goto out;
945 
946 		/* The node is live but pushed out a bad crc. We
947 		 * consider it a transient miss but don't populate any
948 		 * other values as they may be junk. */
949 		mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
950 		     slot->ds_node_num, reg->hr_dev_name);
951 		o2hb_dump_slot(hb_block);
952 
953 		slot->ds_equal_samples++;
954 		goto fire_callbacks;
955 	}
956 
957 	/* we don't care if these wrap.. the state transitions below
958 	 * clear at the right places */
959 	cputime = le64_to_cpu(hb_block->hb_seq);
960 	if (slot->ds_last_time != cputime)
961 		slot->ds_changed_samples++;
962 	else
963 		slot->ds_equal_samples++;
964 	slot->ds_last_time = cputime;
965 
966 	/* The node changed heartbeat generations. We assume this to
967 	 * mean it dropped off but came back before we timed out. We
968 	 * want to consider it down for the time being but don't want
969 	 * to lose any changed_samples state we might build up to
970 	 * considering it live again. */
971 	if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
972 		gen_changed = 1;
973 		slot->ds_equal_samples = 0;
974 		mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
975 		     "to 0x%llx)\n", slot->ds_node_num,
976 		     (long long)slot->ds_last_generation,
977 		     (long long)le64_to_cpu(hb_block->hb_generation));
978 	}
979 
980 	slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
981 
982 	mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
983 	     "seq %llu last %llu changed %u equal %u\n",
984 	     slot->ds_node_num, (long long)slot->ds_last_generation,
985 	     le32_to_cpu(hb_block->hb_cksum),
986 	     (unsigned long long)le64_to_cpu(hb_block->hb_seq),
987 	     (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
988 	     slot->ds_equal_samples);
989 
990 	spin_lock(&o2hb_live_lock);
991 
992 fire_callbacks:
993 	/* dead nodes only come to life after some number of
994 	 * changes at any time during their dead time */
995 	if (list_empty(&slot->ds_live_item) &&
996 	    slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
997 		mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
998 		     slot->ds_node_num, (long long)slot->ds_last_generation);
999 
1000 		set_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1001 
1002 		/* first on the list generates a callback */
1003 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1004 			mlog(ML_HEARTBEAT, "o2hb: Add node %d to live nodes "
1005 			     "bitmap\n", slot->ds_node_num);
1006 			set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1007 
1008 			o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
1009 					      slot->ds_node_num);
1010 
1011 			changed = 1;
1012 			queued = 1;
1013 		}
1014 
1015 		list_add_tail(&slot->ds_live_item,
1016 			      &o2hb_live_slots[slot->ds_node_num]);
1017 
1018 		slot->ds_equal_samples = 0;
1019 
1020 		/* We want to be sure that all nodes agree on the
1021 		 * number of milliseconds before a node will be
1022 		 * considered dead. The self-fencing timeout is
1023 		 * computed from this value, and a discrepancy might
1024 		 * result in heartbeat calling a node dead when it
1025 		 * hasn't self-fenced yet. */
1026 		slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
1027 		if (slot_dead_ms && slot_dead_ms != dead_ms) {
1028 			/* TODO: Perhaps we can fail the region here. */
1029 			mlog(ML_ERROR, "Node %d on device %s has a dead count "
1030 			     "of %u ms, but our count is %u ms.\n"
1031 			     "Please double check your configuration values "
1032 			     "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
1033 			     slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
1034 			     dead_ms);
1035 		}
1036 		goto out;
1037 	}
1038 
1039 	/* if the list is dead, we're done.. */
1040 	if (list_empty(&slot->ds_live_item))
1041 		goto out;
1042 
1043 	/* live nodes only go dead after enough consequtive missed
1044 	 * samples..  reset the missed counter whenever we see
1045 	 * activity */
1046 	if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
1047 		mlog(ML_HEARTBEAT, "Node %d left my region\n",
1048 		     slot->ds_node_num);
1049 
1050 		clear_bit(slot->ds_node_num, reg->hr_live_node_bitmap);
1051 
1052 		/* last off the live_slot generates a callback */
1053 		list_del_init(&slot->ds_live_item);
1054 		if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
1055 			mlog(ML_HEARTBEAT, "o2hb: Remove node %d from live "
1056 			     "nodes bitmap\n", slot->ds_node_num);
1057 			clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
1058 
1059 			/* node can be null */
1060 			o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
1061 					      node, slot->ds_node_num);
1062 
1063 			changed = 1;
1064 			queued = 1;
1065 		}
1066 
1067 		/* We don't clear this because the node is still
1068 		 * actually writing new blocks. */
1069 		if (!gen_changed)
1070 			slot->ds_changed_samples = 0;
1071 		goto out;
1072 	}
1073 	if (slot->ds_changed_samples) {
1074 		slot->ds_changed_samples = 0;
1075 		slot->ds_equal_samples = 0;
1076 	}
1077 out:
1078 	spin_unlock(&o2hb_live_lock);
1079 
1080 	if (queued)
1081 		o2hb_run_event_list(&event);
1082 
1083 	if (node)
1084 		o2nm_node_put(node);
1085 	return changed;
1086 }
1087 
1088 static int o2hb_highest_node(unsigned long *nodes, int numbits)
1089 {
1090 	return find_last_bit(nodes, numbits);
1091 }
1092 
1093 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
1094 {
1095 	int i, ret, highest_node;
1096 	int membership_change = 0, own_slot_ok = 0;
1097 	unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
1098 	unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
1099 	struct o2hb_bio_wait_ctxt write_wc;
1100 
1101 	ret = o2nm_configured_node_map(configured_nodes,
1102 				       sizeof(configured_nodes));
1103 	if (ret) {
1104 		mlog_errno(ret);
1105 		goto bail;
1106 	}
1107 
1108 	/*
1109 	 * If a node is not configured but is in the livemap, we still need
1110 	 * to read the slot so as to be able to remove it from the livemap.
1111 	 */
1112 	o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
1113 	i = -1;
1114 	while ((i = find_next_bit(live_node_bitmap,
1115 				  O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1116 		set_bit(i, configured_nodes);
1117 	}
1118 
1119 	highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
1120 	if (highest_node >= O2NM_MAX_NODES) {
1121 		mlog(ML_NOTICE, "o2hb: No configured nodes found!\n");
1122 		ret = -EINVAL;
1123 		goto bail;
1124 	}
1125 
1126 	/* No sense in reading the slots of nodes that don't exist
1127 	 * yet. Of course, if the node definitions have holes in them
1128 	 * then we're reading an empty slot anyway... Consider this
1129 	 * best-effort. */
1130 	ret = o2hb_read_slots(reg, highest_node + 1);
1131 	if (ret < 0) {
1132 		mlog_errno(ret);
1133 		goto bail;
1134 	}
1135 
1136 	/* With an up to date view of the slots, we can check that no
1137 	 * other node has been improperly configured to heartbeat in
1138 	 * our slot. */
1139 	own_slot_ok = o2hb_check_own_slot(reg);
1140 
1141 	/* fill in the proper info for our next heartbeat */
1142 	o2hb_prepare_block(reg, reg->hr_generation);
1143 
1144 	ret = o2hb_issue_node_write(reg, &write_wc);
1145 	if (ret < 0) {
1146 		mlog_errno(ret);
1147 		goto bail;
1148 	}
1149 
1150 	i = -1;
1151 	while((i = find_next_bit(configured_nodes,
1152 				 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
1153 		membership_change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
1154 	}
1155 
1156 	/*
1157 	 * We have to be sure we've advertised ourselves on disk
1158 	 * before we can go to steady state.  This ensures that
1159 	 * people we find in our steady state have seen us.
1160 	 */
1161 	o2hb_wait_on_io(reg, &write_wc);
1162 	if (write_wc.wc_error) {
1163 		/* Do not re-arm the write timeout on I/O error - we
1164 		 * can't be sure that the new block ever made it to
1165 		 * disk */
1166 		mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
1167 		     write_wc.wc_error, reg->hr_dev_name);
1168 		ret = write_wc.wc_error;
1169 		goto bail;
1170 	}
1171 
1172 	/* Skip disarming the timeout if own slot has stale/bad data */
1173 	if (own_slot_ok) {
1174 		o2hb_set_quorum_device(reg);
1175 		o2hb_arm_timeout(reg);
1176 		reg->hr_last_timeout_start = jiffies;
1177 	}
1178 
1179 bail:
1180 	/* let the person who launched us know when things are steady */
1181 	if (atomic_read(&reg->hr_steady_iterations) != 0) {
1182 		if (!ret && own_slot_ok && !membership_change) {
1183 			if (atomic_dec_and_test(&reg->hr_steady_iterations))
1184 				wake_up(&o2hb_steady_queue);
1185 		}
1186 	}
1187 
1188 	if (atomic_read(&reg->hr_steady_iterations) != 0) {
1189 		if (atomic_dec_and_test(&reg->hr_unsteady_iterations)) {
1190 			printk(KERN_NOTICE "o2hb: Unable to stabilize "
1191 			       "heartbeart on region %s (%s)\n",
1192 			       config_item_name(&reg->hr_item),
1193 			       reg->hr_dev_name);
1194 			atomic_set(&reg->hr_steady_iterations, 0);
1195 			reg->hr_aborted_start = 1;
1196 			wake_up(&o2hb_steady_queue);
1197 			ret = -EIO;
1198 		}
1199 	}
1200 
1201 	return ret;
1202 }
1203 
1204 /*
1205  * we ride the region ref that the region dir holds.  before the region
1206  * dir is removed and drops it ref it will wait to tear down this
1207  * thread.
1208  */
1209 static int o2hb_thread(void *data)
1210 {
1211 	int i, ret;
1212 	struct o2hb_region *reg = data;
1213 	struct o2hb_bio_wait_ctxt write_wc;
1214 	ktime_t before_hb, after_hb;
1215 	unsigned int elapsed_msec;
1216 
1217 	mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
1218 
1219 	set_user_nice(current, MIN_NICE);
1220 
1221 	/* Pin node */
1222 	ret = o2nm_depend_this_node();
1223 	if (ret) {
1224 		mlog(ML_ERROR, "Node has been deleted, ret = %d\n", ret);
1225 		reg->hr_node_deleted = 1;
1226 		wake_up(&o2hb_steady_queue);
1227 		return 0;
1228 	}
1229 
1230 	while (!kthread_should_stop() &&
1231 	       !reg->hr_unclean_stop && !reg->hr_aborted_start) {
1232 		/* We track the time spent inside
1233 		 * o2hb_do_disk_heartbeat so that we avoid more than
1234 		 * hr_timeout_ms between disk writes. On busy systems
1235 		 * this should result in a heartbeat which is less
1236 		 * likely to time itself out. */
1237 		before_hb = ktime_get_real();
1238 
1239 		ret = o2hb_do_disk_heartbeat(reg);
1240 		reg->hr_last_hb_status = ret;
1241 
1242 		after_hb = ktime_get_real();
1243 
1244 		elapsed_msec = (unsigned int)
1245 				ktime_ms_delta(after_hb, before_hb);
1246 
1247 		mlog(ML_HEARTBEAT,
1248 		     "start = %lld, end = %lld, msec = %u, ret = %d\n",
1249 		     before_hb.tv64, after_hb.tv64, elapsed_msec, ret);
1250 
1251 		if (!kthread_should_stop() &&
1252 		    elapsed_msec < reg->hr_timeout_ms) {
1253 			/* the kthread api has blocked signals for us so no
1254 			 * need to record the return value. */
1255 			msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
1256 		}
1257 	}
1258 
1259 	o2hb_disarm_timeout(reg);
1260 
1261 	/* unclean stop is only used in very bad situation */
1262 	for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
1263 		o2hb_shutdown_slot(&reg->hr_slots[i]);
1264 
1265 	/* Explicit down notification - avoid forcing the other nodes
1266 	 * to timeout on this region when we could just as easily
1267 	 * write a clear generation - thus indicating to them that
1268 	 * this node has left this region.
1269 	 */
1270 	if (!reg->hr_unclean_stop && !reg->hr_aborted_start) {
1271 		o2hb_prepare_block(reg, 0);
1272 		ret = o2hb_issue_node_write(reg, &write_wc);
1273 		if (ret == 0)
1274 			o2hb_wait_on_io(reg, &write_wc);
1275 		else
1276 			mlog_errno(ret);
1277 	}
1278 
1279 	/* Unpin node */
1280 	o2nm_undepend_this_node();
1281 
1282 	mlog(ML_HEARTBEAT|ML_KTHREAD, "o2hb thread exiting\n");
1283 
1284 	return 0;
1285 }
1286 
1287 #ifdef CONFIG_DEBUG_FS
1288 static int o2hb_debug_open(struct inode *inode, struct file *file)
1289 {
1290 	struct o2hb_debug_buf *db = inode->i_private;
1291 	struct o2hb_region *reg;
1292 	unsigned long map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1293 	unsigned long lts;
1294 	char *buf = NULL;
1295 	int i = -1;
1296 	int out = 0;
1297 
1298 	/* max_nodes should be the largest bitmap we pass here */
1299 	BUG_ON(sizeof(map) < db->db_size);
1300 
1301 	buf = kmalloc(PAGE_SIZE, GFP_KERNEL);
1302 	if (!buf)
1303 		goto bail;
1304 
1305 	switch (db->db_type) {
1306 	case O2HB_DB_TYPE_LIVENODES:
1307 	case O2HB_DB_TYPE_LIVEREGIONS:
1308 	case O2HB_DB_TYPE_QUORUMREGIONS:
1309 	case O2HB_DB_TYPE_FAILEDREGIONS:
1310 		spin_lock(&o2hb_live_lock);
1311 		memcpy(map, db->db_data, db->db_size);
1312 		spin_unlock(&o2hb_live_lock);
1313 		break;
1314 
1315 	case O2HB_DB_TYPE_REGION_LIVENODES:
1316 		spin_lock(&o2hb_live_lock);
1317 		reg = (struct o2hb_region *)db->db_data;
1318 		memcpy(map, reg->hr_live_node_bitmap, db->db_size);
1319 		spin_unlock(&o2hb_live_lock);
1320 		break;
1321 
1322 	case O2HB_DB_TYPE_REGION_NUMBER:
1323 		reg = (struct o2hb_region *)db->db_data;
1324 		out += snprintf(buf + out, PAGE_SIZE - out, "%d\n",
1325 				reg->hr_region_num);
1326 		goto done;
1327 
1328 	case O2HB_DB_TYPE_REGION_ELAPSED_TIME:
1329 		reg = (struct o2hb_region *)db->db_data;
1330 		lts = reg->hr_last_timeout_start;
1331 		/* If 0, it has never been set before */
1332 		if (lts)
1333 			lts = jiffies_to_msecs(jiffies - lts);
1334 		out += snprintf(buf + out, PAGE_SIZE - out, "%lu\n", lts);
1335 		goto done;
1336 
1337 	case O2HB_DB_TYPE_REGION_PINNED:
1338 		reg = (struct o2hb_region *)db->db_data;
1339 		out += snprintf(buf + out, PAGE_SIZE - out, "%u\n",
1340 				!!reg->hr_item_pinned);
1341 		goto done;
1342 
1343 	default:
1344 		goto done;
1345 	}
1346 
1347 	while ((i = find_next_bit(map, db->db_len, i + 1)) < db->db_len)
1348 		out += snprintf(buf + out, PAGE_SIZE - out, "%d ", i);
1349 	out += snprintf(buf + out, PAGE_SIZE - out, "\n");
1350 
1351 done:
1352 	i_size_write(inode, out);
1353 
1354 	file->private_data = buf;
1355 
1356 	return 0;
1357 bail:
1358 	return -ENOMEM;
1359 }
1360 
1361 static int o2hb_debug_release(struct inode *inode, struct file *file)
1362 {
1363 	kfree(file->private_data);
1364 	return 0;
1365 }
1366 
1367 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1368 				 size_t nbytes, loff_t *ppos)
1369 {
1370 	return simple_read_from_buffer(buf, nbytes, ppos, file->private_data,
1371 				       i_size_read(file->f_mapping->host));
1372 }
1373 #else
1374 static int o2hb_debug_open(struct inode *inode, struct file *file)
1375 {
1376 	return 0;
1377 }
1378 static int o2hb_debug_release(struct inode *inode, struct file *file)
1379 {
1380 	return 0;
1381 }
1382 static ssize_t o2hb_debug_read(struct file *file, char __user *buf,
1383 			       size_t nbytes, loff_t *ppos)
1384 {
1385 	return 0;
1386 }
1387 #endif  /* CONFIG_DEBUG_FS */
1388 
1389 static const struct file_operations o2hb_debug_fops = {
1390 	.open =		o2hb_debug_open,
1391 	.release =	o2hb_debug_release,
1392 	.read =		o2hb_debug_read,
1393 	.llseek =	generic_file_llseek,
1394 };
1395 
1396 void o2hb_exit(void)
1397 {
1398 	debugfs_remove(o2hb_debug_failedregions);
1399 	debugfs_remove(o2hb_debug_quorumregions);
1400 	debugfs_remove(o2hb_debug_liveregions);
1401 	debugfs_remove(o2hb_debug_livenodes);
1402 	debugfs_remove(o2hb_debug_dir);
1403 	kfree(o2hb_db_livenodes);
1404 	kfree(o2hb_db_liveregions);
1405 	kfree(o2hb_db_quorumregions);
1406 	kfree(o2hb_db_failedregions);
1407 }
1408 
1409 static struct dentry *o2hb_debug_create(const char *name, struct dentry *dir,
1410 					struct o2hb_debug_buf **db, int db_len,
1411 					int type, int size, int len, void *data)
1412 {
1413 	*db = kmalloc(db_len, GFP_KERNEL);
1414 	if (!*db)
1415 		return NULL;
1416 
1417 	(*db)->db_type = type;
1418 	(*db)->db_size = size;
1419 	(*db)->db_len = len;
1420 	(*db)->db_data = data;
1421 
1422 	return debugfs_create_file(name, S_IFREG|S_IRUSR, dir, *db,
1423 				   &o2hb_debug_fops);
1424 }
1425 
1426 static int o2hb_debug_init(void)
1427 {
1428 	int ret = -ENOMEM;
1429 
1430 	o2hb_debug_dir = debugfs_create_dir(O2HB_DEBUG_DIR, NULL);
1431 	if (!o2hb_debug_dir) {
1432 		mlog_errno(ret);
1433 		goto bail;
1434 	}
1435 
1436 	o2hb_debug_livenodes = o2hb_debug_create(O2HB_DEBUG_LIVENODES,
1437 						 o2hb_debug_dir,
1438 						 &o2hb_db_livenodes,
1439 						 sizeof(*o2hb_db_livenodes),
1440 						 O2HB_DB_TYPE_LIVENODES,
1441 						 sizeof(o2hb_live_node_bitmap),
1442 						 O2NM_MAX_NODES,
1443 						 o2hb_live_node_bitmap);
1444 	if (!o2hb_debug_livenodes) {
1445 		mlog_errno(ret);
1446 		goto bail;
1447 	}
1448 
1449 	o2hb_debug_liveregions = o2hb_debug_create(O2HB_DEBUG_LIVEREGIONS,
1450 						   o2hb_debug_dir,
1451 						   &o2hb_db_liveregions,
1452 						   sizeof(*o2hb_db_liveregions),
1453 						   O2HB_DB_TYPE_LIVEREGIONS,
1454 						   sizeof(o2hb_live_region_bitmap),
1455 						   O2NM_MAX_REGIONS,
1456 						   o2hb_live_region_bitmap);
1457 	if (!o2hb_debug_liveregions) {
1458 		mlog_errno(ret);
1459 		goto bail;
1460 	}
1461 
1462 	o2hb_debug_quorumregions =
1463 			o2hb_debug_create(O2HB_DEBUG_QUORUMREGIONS,
1464 					  o2hb_debug_dir,
1465 					  &o2hb_db_quorumregions,
1466 					  sizeof(*o2hb_db_quorumregions),
1467 					  O2HB_DB_TYPE_QUORUMREGIONS,
1468 					  sizeof(o2hb_quorum_region_bitmap),
1469 					  O2NM_MAX_REGIONS,
1470 					  o2hb_quorum_region_bitmap);
1471 	if (!o2hb_debug_quorumregions) {
1472 		mlog_errno(ret);
1473 		goto bail;
1474 	}
1475 
1476 	o2hb_debug_failedregions =
1477 			o2hb_debug_create(O2HB_DEBUG_FAILEDREGIONS,
1478 					  o2hb_debug_dir,
1479 					  &o2hb_db_failedregions,
1480 					  sizeof(*o2hb_db_failedregions),
1481 					  O2HB_DB_TYPE_FAILEDREGIONS,
1482 					  sizeof(o2hb_failed_region_bitmap),
1483 					  O2NM_MAX_REGIONS,
1484 					  o2hb_failed_region_bitmap);
1485 	if (!o2hb_debug_failedregions) {
1486 		mlog_errno(ret);
1487 		goto bail;
1488 	}
1489 
1490 	ret = 0;
1491 bail:
1492 	if (ret)
1493 		o2hb_exit();
1494 
1495 	return ret;
1496 }
1497 
1498 int o2hb_init(void)
1499 {
1500 	int i;
1501 
1502 	for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
1503 		INIT_LIST_HEAD(&o2hb_callbacks[i].list);
1504 
1505 	for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
1506 		INIT_LIST_HEAD(&o2hb_live_slots[i]);
1507 
1508 	INIT_LIST_HEAD(&o2hb_node_events);
1509 
1510 	memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
1511 	memset(o2hb_region_bitmap, 0, sizeof(o2hb_region_bitmap));
1512 	memset(o2hb_live_region_bitmap, 0, sizeof(o2hb_live_region_bitmap));
1513 	memset(o2hb_quorum_region_bitmap, 0, sizeof(o2hb_quorum_region_bitmap));
1514 	memset(o2hb_failed_region_bitmap, 0, sizeof(o2hb_failed_region_bitmap));
1515 
1516 	o2hb_dependent_users = 0;
1517 
1518 	return o2hb_debug_init();
1519 }
1520 
1521 /* if we're already in a callback then we're already serialized by the sem */
1522 static void o2hb_fill_node_map_from_callback(unsigned long *map,
1523 					     unsigned bytes)
1524 {
1525 	BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
1526 
1527 	memcpy(map, &o2hb_live_node_bitmap, bytes);
1528 }
1529 
1530 /*
1531  * get a map of all nodes that are heartbeating in any regions
1532  */
1533 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
1534 {
1535 	/* callers want to serialize this map and callbacks so that they
1536 	 * can trust that they don't miss nodes coming to the party */
1537 	down_read(&o2hb_callback_sem);
1538 	spin_lock(&o2hb_live_lock);
1539 	o2hb_fill_node_map_from_callback(map, bytes);
1540 	spin_unlock(&o2hb_live_lock);
1541 	up_read(&o2hb_callback_sem);
1542 }
1543 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
1544 
1545 /*
1546  * heartbeat configfs bits.  The heartbeat set is a default set under
1547  * the cluster set in nodemanager.c.
1548  */
1549 
1550 static struct o2hb_region *to_o2hb_region(struct config_item *item)
1551 {
1552 	return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
1553 }
1554 
1555 /* drop_item only drops its ref after killing the thread, nothing should
1556  * be using the region anymore.  this has to clean up any state that
1557  * attributes might have built up. */
1558 static void o2hb_region_release(struct config_item *item)
1559 {
1560 	int i;
1561 	struct page *page;
1562 	struct o2hb_region *reg = to_o2hb_region(item);
1563 
1564 	mlog(ML_HEARTBEAT, "hb region release (%s)\n", reg->hr_dev_name);
1565 
1566 	kfree(reg->hr_tmp_block);
1567 
1568 	if (reg->hr_slot_data) {
1569 		for (i = 0; i < reg->hr_num_pages; i++) {
1570 			page = reg->hr_slot_data[i];
1571 			if (page)
1572 				__free_page(page);
1573 		}
1574 		kfree(reg->hr_slot_data);
1575 	}
1576 
1577 	if (reg->hr_bdev)
1578 		blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1579 
1580 	kfree(reg->hr_slots);
1581 
1582 	debugfs_remove(reg->hr_debug_livenodes);
1583 	debugfs_remove(reg->hr_debug_regnum);
1584 	debugfs_remove(reg->hr_debug_elapsed_time);
1585 	debugfs_remove(reg->hr_debug_pinned);
1586 	debugfs_remove(reg->hr_debug_dir);
1587 	kfree(reg->hr_db_livenodes);
1588 	kfree(reg->hr_db_regnum);
1589 	kfree(reg->hr_db_elapsed_time);
1590 	kfree(reg->hr_db_pinned);
1591 
1592 	spin_lock(&o2hb_live_lock);
1593 	list_del(&reg->hr_all_item);
1594 	spin_unlock(&o2hb_live_lock);
1595 
1596 	o2net_unregister_handler_list(&reg->hr_handler_list);
1597 	kfree(reg);
1598 }
1599 
1600 static int o2hb_read_block_input(struct o2hb_region *reg,
1601 				 const char *page,
1602 				 unsigned long *ret_bytes,
1603 				 unsigned int *ret_bits)
1604 {
1605 	unsigned long bytes;
1606 	char *p = (char *)page;
1607 
1608 	bytes = simple_strtoul(p, &p, 0);
1609 	if (!p || (*p && (*p != '\n')))
1610 		return -EINVAL;
1611 
1612 	/* Heartbeat and fs min / max block sizes are the same. */
1613 	if (bytes > 4096 || bytes < 512)
1614 		return -ERANGE;
1615 	if (hweight16(bytes) != 1)
1616 		return -EINVAL;
1617 
1618 	if (ret_bytes)
1619 		*ret_bytes = bytes;
1620 	if (ret_bits)
1621 		*ret_bits = ffs(bytes) - 1;
1622 
1623 	return 0;
1624 }
1625 
1626 static ssize_t o2hb_region_block_bytes_show(struct config_item *item,
1627 					    char *page)
1628 {
1629 	return sprintf(page, "%u\n", to_o2hb_region(item)->hr_block_bytes);
1630 }
1631 
1632 static ssize_t o2hb_region_block_bytes_store(struct config_item *item,
1633 					     const char *page,
1634 					     size_t count)
1635 {
1636 	struct o2hb_region *reg = to_o2hb_region(item);
1637 	int status;
1638 	unsigned long block_bytes;
1639 	unsigned int block_bits;
1640 
1641 	if (reg->hr_bdev)
1642 		return -EINVAL;
1643 
1644 	status = o2hb_read_block_input(reg, page, &block_bytes,
1645 				       &block_bits);
1646 	if (status)
1647 		return status;
1648 
1649 	reg->hr_block_bytes = (unsigned int)block_bytes;
1650 	reg->hr_block_bits = block_bits;
1651 
1652 	return count;
1653 }
1654 
1655 static ssize_t o2hb_region_start_block_show(struct config_item *item,
1656 					    char *page)
1657 {
1658 	return sprintf(page, "%llu\n", to_o2hb_region(item)->hr_start_block);
1659 }
1660 
1661 static ssize_t o2hb_region_start_block_store(struct config_item *item,
1662 					     const char *page,
1663 					     size_t count)
1664 {
1665 	struct o2hb_region *reg = to_o2hb_region(item);
1666 	unsigned long long tmp;
1667 	char *p = (char *)page;
1668 
1669 	if (reg->hr_bdev)
1670 		return -EINVAL;
1671 
1672 	tmp = simple_strtoull(p, &p, 0);
1673 	if (!p || (*p && (*p != '\n')))
1674 		return -EINVAL;
1675 
1676 	reg->hr_start_block = tmp;
1677 
1678 	return count;
1679 }
1680 
1681 static ssize_t o2hb_region_blocks_show(struct config_item *item, char *page)
1682 {
1683 	return sprintf(page, "%d\n", to_o2hb_region(item)->hr_blocks);
1684 }
1685 
1686 static ssize_t o2hb_region_blocks_store(struct config_item *item,
1687 					const char *page,
1688 					size_t count)
1689 {
1690 	struct o2hb_region *reg = to_o2hb_region(item);
1691 	unsigned long tmp;
1692 	char *p = (char *)page;
1693 
1694 	if (reg->hr_bdev)
1695 		return -EINVAL;
1696 
1697 	tmp = simple_strtoul(p, &p, 0);
1698 	if (!p || (*p && (*p != '\n')))
1699 		return -EINVAL;
1700 
1701 	if (tmp > O2NM_MAX_NODES || tmp == 0)
1702 		return -ERANGE;
1703 
1704 	reg->hr_blocks = (unsigned int)tmp;
1705 
1706 	return count;
1707 }
1708 
1709 static ssize_t o2hb_region_dev_show(struct config_item *item, char *page)
1710 {
1711 	unsigned int ret = 0;
1712 
1713 	if (to_o2hb_region(item)->hr_bdev)
1714 		ret = sprintf(page, "%s\n", to_o2hb_region(item)->hr_dev_name);
1715 
1716 	return ret;
1717 }
1718 
1719 static void o2hb_init_region_params(struct o2hb_region *reg)
1720 {
1721 	reg->hr_slots_per_page = PAGE_SIZE >> reg->hr_block_bits;
1722 	reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1723 
1724 	mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1725 	     reg->hr_start_block, reg->hr_blocks);
1726 	mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1727 	     reg->hr_block_bytes, reg->hr_block_bits);
1728 	mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1729 	mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1730 }
1731 
1732 static int o2hb_map_slot_data(struct o2hb_region *reg)
1733 {
1734 	int i, j;
1735 	unsigned int last_slot;
1736 	unsigned int spp = reg->hr_slots_per_page;
1737 	struct page *page;
1738 	char *raw;
1739 	struct o2hb_disk_slot *slot;
1740 
1741 	reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1742 	if (reg->hr_tmp_block == NULL)
1743 		return -ENOMEM;
1744 
1745 	reg->hr_slots = kcalloc(reg->hr_blocks,
1746 				sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1747 	if (reg->hr_slots == NULL)
1748 		return -ENOMEM;
1749 
1750 	for(i = 0; i < reg->hr_blocks; i++) {
1751 		slot = &reg->hr_slots[i];
1752 		slot->ds_node_num = i;
1753 		INIT_LIST_HEAD(&slot->ds_live_item);
1754 		slot->ds_raw_block = NULL;
1755 	}
1756 
1757 	reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1758 	mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1759 			   "at %u blocks per page\n",
1760 	     reg->hr_num_pages, reg->hr_blocks, spp);
1761 
1762 	reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1763 				    GFP_KERNEL);
1764 	if (!reg->hr_slot_data)
1765 		return -ENOMEM;
1766 
1767 	for(i = 0; i < reg->hr_num_pages; i++) {
1768 		page = alloc_page(GFP_KERNEL);
1769 		if (!page)
1770 			return -ENOMEM;
1771 
1772 		reg->hr_slot_data[i] = page;
1773 
1774 		last_slot = i * spp;
1775 		raw = page_address(page);
1776 		for (j = 0;
1777 		     (j < spp) && ((j + last_slot) < reg->hr_blocks);
1778 		     j++) {
1779 			BUG_ON((j + last_slot) >= reg->hr_blocks);
1780 
1781 			slot = &reg->hr_slots[j + last_slot];
1782 			slot->ds_raw_block =
1783 				(struct o2hb_disk_heartbeat_block *) raw;
1784 
1785 			raw += reg->hr_block_bytes;
1786 		}
1787 	}
1788 
1789 	return 0;
1790 }
1791 
1792 /* Read in all the slots available and populate the tracking
1793  * structures so that we can start with a baseline idea of what's
1794  * there. */
1795 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1796 {
1797 	int ret, i;
1798 	struct o2hb_disk_slot *slot;
1799 	struct o2hb_disk_heartbeat_block *hb_block;
1800 
1801 	ret = o2hb_read_slots(reg, reg->hr_blocks);
1802 	if (ret)
1803 		goto out;
1804 
1805 	/* We only want to get an idea of the values initially in each
1806 	 * slot, so we do no verification - o2hb_check_slot will
1807 	 * actually determine if each configured slot is valid and
1808 	 * whether any values have changed. */
1809 	for(i = 0; i < reg->hr_blocks; i++) {
1810 		slot = &reg->hr_slots[i];
1811 		hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1812 
1813 		/* Only fill the values that o2hb_check_slot uses to
1814 		 * determine changing slots */
1815 		slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1816 		slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1817 	}
1818 
1819 out:
1820 	return ret;
1821 }
1822 
1823 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1824 static ssize_t o2hb_region_dev_store(struct config_item *item,
1825 				     const char *page,
1826 				     size_t count)
1827 {
1828 	struct o2hb_region *reg = to_o2hb_region(item);
1829 	struct task_struct *hb_task;
1830 	long fd;
1831 	int sectsize;
1832 	char *p = (char *)page;
1833 	struct fd f;
1834 	struct inode *inode;
1835 	ssize_t ret = -EINVAL;
1836 	int live_threshold;
1837 
1838 	if (reg->hr_bdev)
1839 		goto out;
1840 
1841 	/* We can't heartbeat without having had our node number
1842 	 * configured yet. */
1843 	if (o2nm_this_node() == O2NM_MAX_NODES)
1844 		goto out;
1845 
1846 	fd = simple_strtol(p, &p, 0);
1847 	if (!p || (*p && (*p != '\n')))
1848 		goto out;
1849 
1850 	if (fd < 0 || fd >= INT_MAX)
1851 		goto out;
1852 
1853 	f = fdget(fd);
1854 	if (f.file == NULL)
1855 		goto out;
1856 
1857 	if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1858 	    reg->hr_block_bytes == 0)
1859 		goto out2;
1860 
1861 	inode = igrab(f.file->f_mapping->host);
1862 	if (inode == NULL)
1863 		goto out2;
1864 
1865 	if (!S_ISBLK(inode->i_mode))
1866 		goto out3;
1867 
1868 	reg->hr_bdev = I_BDEV(f.file->f_mapping->host);
1869 	ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, NULL);
1870 	if (ret) {
1871 		reg->hr_bdev = NULL;
1872 		goto out3;
1873 	}
1874 	inode = NULL;
1875 
1876 	bdevname(reg->hr_bdev, reg->hr_dev_name);
1877 
1878 	sectsize = bdev_logical_block_size(reg->hr_bdev);
1879 	if (sectsize != reg->hr_block_bytes) {
1880 		mlog(ML_ERROR,
1881 		     "blocksize %u incorrect for device, expected %d",
1882 		     reg->hr_block_bytes, sectsize);
1883 		ret = -EINVAL;
1884 		goto out3;
1885 	}
1886 
1887 	o2hb_init_region_params(reg);
1888 
1889 	/* Generation of zero is invalid */
1890 	do {
1891 		get_random_bytes(&reg->hr_generation,
1892 				 sizeof(reg->hr_generation));
1893 	} while (reg->hr_generation == 0);
1894 
1895 	ret = o2hb_map_slot_data(reg);
1896 	if (ret) {
1897 		mlog_errno(ret);
1898 		goto out3;
1899 	}
1900 
1901 	ret = o2hb_populate_slot_data(reg);
1902 	if (ret) {
1903 		mlog_errno(ret);
1904 		goto out3;
1905 	}
1906 
1907 	INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1908 	INIT_DELAYED_WORK(&reg->hr_nego_timeout_work, o2hb_nego_timeout);
1909 
1910 	/*
1911 	 * A node is considered live after it has beat LIVE_THRESHOLD
1912 	 * times.  We're not steady until we've given them a chance
1913 	 * _after_ our first read.
1914 	 * The default threshold is bare minimum so as to limit the delay
1915 	 * during mounts. For global heartbeat, the threshold doubled for the
1916 	 * first region.
1917 	 */
1918 	live_threshold = O2HB_LIVE_THRESHOLD;
1919 	if (o2hb_global_heartbeat_active()) {
1920 		spin_lock(&o2hb_live_lock);
1921 		if (bitmap_weight(o2hb_region_bitmap, O2NM_MAX_REGIONS) == 1)
1922 			live_threshold <<= 1;
1923 		spin_unlock(&o2hb_live_lock);
1924 	}
1925 	++live_threshold;
1926 	atomic_set(&reg->hr_steady_iterations, live_threshold);
1927 	/* unsteady_iterations is triple the steady_iterations */
1928 	atomic_set(&reg->hr_unsteady_iterations, (live_threshold * 3));
1929 
1930 	hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1931 			      reg->hr_item.ci_name);
1932 	if (IS_ERR(hb_task)) {
1933 		ret = PTR_ERR(hb_task);
1934 		mlog_errno(ret);
1935 		goto out3;
1936 	}
1937 
1938 	spin_lock(&o2hb_live_lock);
1939 	reg->hr_task = hb_task;
1940 	spin_unlock(&o2hb_live_lock);
1941 
1942 	ret = wait_event_interruptible(o2hb_steady_queue,
1943 				atomic_read(&reg->hr_steady_iterations) == 0 ||
1944 				reg->hr_node_deleted);
1945 	if (ret) {
1946 		atomic_set(&reg->hr_steady_iterations, 0);
1947 		reg->hr_aborted_start = 1;
1948 	}
1949 
1950 	if (reg->hr_aborted_start) {
1951 		ret = -EIO;
1952 		goto out3;
1953 	}
1954 
1955 	if (reg->hr_node_deleted) {
1956 		ret = -EINVAL;
1957 		goto out3;
1958 	}
1959 
1960 	/* Ok, we were woken.  Make sure it wasn't by drop_item() */
1961 	spin_lock(&o2hb_live_lock);
1962 	hb_task = reg->hr_task;
1963 	if (o2hb_global_heartbeat_active())
1964 		set_bit(reg->hr_region_num, o2hb_live_region_bitmap);
1965 	spin_unlock(&o2hb_live_lock);
1966 
1967 	if (hb_task)
1968 		ret = count;
1969 	else
1970 		ret = -EIO;
1971 
1972 	if (hb_task && o2hb_global_heartbeat_active())
1973 		printk(KERN_NOTICE "o2hb: Heartbeat started on region %s (%s)\n",
1974 		       config_item_name(&reg->hr_item), reg->hr_dev_name);
1975 
1976 out3:
1977 	iput(inode);
1978 out2:
1979 	fdput(f);
1980 out:
1981 	if (ret < 0) {
1982 		if (reg->hr_bdev) {
1983 			blkdev_put(reg->hr_bdev, FMODE_READ|FMODE_WRITE);
1984 			reg->hr_bdev = NULL;
1985 		}
1986 	}
1987 	return ret;
1988 }
1989 
1990 static ssize_t o2hb_region_pid_show(struct config_item *item, char *page)
1991 {
1992 	struct o2hb_region *reg = to_o2hb_region(item);
1993 	pid_t pid = 0;
1994 
1995 	spin_lock(&o2hb_live_lock);
1996 	if (reg->hr_task)
1997 		pid = task_pid_nr(reg->hr_task);
1998 	spin_unlock(&o2hb_live_lock);
1999 
2000 	if (!pid)
2001 		return 0;
2002 
2003 	return sprintf(page, "%u\n", pid);
2004 }
2005 
2006 CONFIGFS_ATTR(o2hb_region_, block_bytes);
2007 CONFIGFS_ATTR(o2hb_region_, start_block);
2008 CONFIGFS_ATTR(o2hb_region_, blocks);
2009 CONFIGFS_ATTR(o2hb_region_, dev);
2010 CONFIGFS_ATTR_RO(o2hb_region_, pid);
2011 
2012 static struct configfs_attribute *o2hb_region_attrs[] = {
2013 	&o2hb_region_attr_block_bytes,
2014 	&o2hb_region_attr_start_block,
2015 	&o2hb_region_attr_blocks,
2016 	&o2hb_region_attr_dev,
2017 	&o2hb_region_attr_pid,
2018 	NULL,
2019 };
2020 
2021 static struct configfs_item_operations o2hb_region_item_ops = {
2022 	.release		= o2hb_region_release,
2023 };
2024 
2025 static struct config_item_type o2hb_region_type = {
2026 	.ct_item_ops	= &o2hb_region_item_ops,
2027 	.ct_attrs	= o2hb_region_attrs,
2028 	.ct_owner	= THIS_MODULE,
2029 };
2030 
2031 /* heartbeat set */
2032 
2033 struct o2hb_heartbeat_group {
2034 	struct config_group hs_group;
2035 	/* some stuff? */
2036 };
2037 
2038 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
2039 {
2040 	return group ?
2041 		container_of(group, struct o2hb_heartbeat_group, hs_group)
2042 		: NULL;
2043 }
2044 
2045 static int o2hb_debug_region_init(struct o2hb_region *reg, struct dentry *dir)
2046 {
2047 	int ret = -ENOMEM;
2048 
2049 	reg->hr_debug_dir =
2050 		debugfs_create_dir(config_item_name(&reg->hr_item), dir);
2051 	if (!reg->hr_debug_dir) {
2052 		mlog_errno(ret);
2053 		goto bail;
2054 	}
2055 
2056 	reg->hr_debug_livenodes =
2057 			o2hb_debug_create(O2HB_DEBUG_LIVENODES,
2058 					  reg->hr_debug_dir,
2059 					  &(reg->hr_db_livenodes),
2060 					  sizeof(*(reg->hr_db_livenodes)),
2061 					  O2HB_DB_TYPE_REGION_LIVENODES,
2062 					  sizeof(reg->hr_live_node_bitmap),
2063 					  O2NM_MAX_NODES, reg);
2064 	if (!reg->hr_debug_livenodes) {
2065 		mlog_errno(ret);
2066 		goto bail;
2067 	}
2068 
2069 	reg->hr_debug_regnum =
2070 			o2hb_debug_create(O2HB_DEBUG_REGION_NUMBER,
2071 					  reg->hr_debug_dir,
2072 					  &(reg->hr_db_regnum),
2073 					  sizeof(*(reg->hr_db_regnum)),
2074 					  O2HB_DB_TYPE_REGION_NUMBER,
2075 					  0, O2NM_MAX_NODES, reg);
2076 	if (!reg->hr_debug_regnum) {
2077 		mlog_errno(ret);
2078 		goto bail;
2079 	}
2080 
2081 	reg->hr_debug_elapsed_time =
2082 			o2hb_debug_create(O2HB_DEBUG_REGION_ELAPSED_TIME,
2083 					  reg->hr_debug_dir,
2084 					  &(reg->hr_db_elapsed_time),
2085 					  sizeof(*(reg->hr_db_elapsed_time)),
2086 					  O2HB_DB_TYPE_REGION_ELAPSED_TIME,
2087 					  0, 0, reg);
2088 	if (!reg->hr_debug_elapsed_time) {
2089 		mlog_errno(ret);
2090 		goto bail;
2091 	}
2092 
2093 	reg->hr_debug_pinned =
2094 			o2hb_debug_create(O2HB_DEBUG_REGION_PINNED,
2095 					  reg->hr_debug_dir,
2096 					  &(reg->hr_db_pinned),
2097 					  sizeof(*(reg->hr_db_pinned)),
2098 					  O2HB_DB_TYPE_REGION_PINNED,
2099 					  0, 0, reg);
2100 	if (!reg->hr_debug_pinned) {
2101 		mlog_errno(ret);
2102 		goto bail;
2103 	}
2104 
2105 	ret = 0;
2106 bail:
2107 	return ret;
2108 }
2109 
2110 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
2111 							  const char *name)
2112 {
2113 	struct o2hb_region *reg = NULL;
2114 	int ret;
2115 
2116 	reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
2117 	if (reg == NULL)
2118 		return ERR_PTR(-ENOMEM);
2119 
2120 	if (strlen(name) > O2HB_MAX_REGION_NAME_LEN) {
2121 		ret = -ENAMETOOLONG;
2122 		goto free;
2123 	}
2124 
2125 	spin_lock(&o2hb_live_lock);
2126 	reg->hr_region_num = 0;
2127 	if (o2hb_global_heartbeat_active()) {
2128 		reg->hr_region_num = find_first_zero_bit(o2hb_region_bitmap,
2129 							 O2NM_MAX_REGIONS);
2130 		if (reg->hr_region_num >= O2NM_MAX_REGIONS) {
2131 			spin_unlock(&o2hb_live_lock);
2132 			ret = -EFBIG;
2133 			goto free;
2134 		}
2135 		set_bit(reg->hr_region_num, o2hb_region_bitmap);
2136 	}
2137 	list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
2138 	spin_unlock(&o2hb_live_lock);
2139 
2140 	config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
2141 
2142 	/* this is the same way to generate msg key as dlm, for local heartbeat,
2143 	 * name is also the same, so make initial crc value different to avoid
2144 	 * message key conflict.
2145 	 */
2146 	reg->hr_key = crc32_le(reg->hr_region_num + O2NM_MAX_REGIONS,
2147 		name, strlen(name));
2148 	INIT_LIST_HEAD(&reg->hr_handler_list);
2149 	ret = o2net_register_handler(O2HB_NEGO_TIMEOUT_MSG, reg->hr_key,
2150 			sizeof(struct o2hb_nego_msg),
2151 			o2hb_nego_timeout_handler,
2152 			reg, NULL, &reg->hr_handler_list);
2153 	if (ret)
2154 		goto free;
2155 
2156 	ret = o2net_register_handler(O2HB_NEGO_APPROVE_MSG, reg->hr_key,
2157 			sizeof(struct o2hb_nego_msg),
2158 			o2hb_nego_approve_handler,
2159 			reg, NULL, &reg->hr_handler_list);
2160 	if (ret)
2161 		goto unregister_handler;
2162 
2163 	ret = o2hb_debug_region_init(reg, o2hb_debug_dir);
2164 	if (ret) {
2165 		config_item_put(&reg->hr_item);
2166 		goto unregister_handler;
2167 	}
2168 
2169 	return &reg->hr_item;
2170 
2171 unregister_handler:
2172 	o2net_unregister_handler_list(&reg->hr_handler_list);
2173 free:
2174 	kfree(reg);
2175 	return ERR_PTR(ret);
2176 }
2177 
2178 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
2179 					   struct config_item *item)
2180 {
2181 	struct task_struct *hb_task;
2182 	struct o2hb_region *reg = to_o2hb_region(item);
2183 	int quorum_region = 0;
2184 
2185 	/* stop the thread when the user removes the region dir */
2186 	spin_lock(&o2hb_live_lock);
2187 	hb_task = reg->hr_task;
2188 	reg->hr_task = NULL;
2189 	reg->hr_item_dropped = 1;
2190 	spin_unlock(&o2hb_live_lock);
2191 
2192 	if (hb_task)
2193 		kthread_stop(hb_task);
2194 
2195 	if (o2hb_global_heartbeat_active()) {
2196 		spin_lock(&o2hb_live_lock);
2197 		clear_bit(reg->hr_region_num, o2hb_region_bitmap);
2198 		clear_bit(reg->hr_region_num, o2hb_live_region_bitmap);
2199 		if (test_bit(reg->hr_region_num, o2hb_quorum_region_bitmap))
2200 			quorum_region = 1;
2201 		clear_bit(reg->hr_region_num, o2hb_quorum_region_bitmap);
2202 		spin_unlock(&o2hb_live_lock);
2203 		printk(KERN_NOTICE "o2hb: Heartbeat %s on region %s (%s)\n",
2204 		       ((atomic_read(&reg->hr_steady_iterations) == 0) ?
2205 			"stopped" : "start aborted"), config_item_name(item),
2206 		       reg->hr_dev_name);
2207 	}
2208 
2209 	/*
2210 	 * If we're racing a dev_write(), we need to wake them.  They will
2211 	 * check reg->hr_task
2212 	 */
2213 	if (atomic_read(&reg->hr_steady_iterations) != 0) {
2214 		reg->hr_aborted_start = 1;
2215 		atomic_set(&reg->hr_steady_iterations, 0);
2216 		wake_up(&o2hb_steady_queue);
2217 	}
2218 
2219 	config_item_put(item);
2220 
2221 	if (!o2hb_global_heartbeat_active() || !quorum_region)
2222 		return;
2223 
2224 	/*
2225 	 * If global heartbeat active and there are dependent users,
2226 	 * pin all regions if quorum region count <= CUT_OFF
2227 	 */
2228 	spin_lock(&o2hb_live_lock);
2229 
2230 	if (!o2hb_dependent_users)
2231 		goto unlock;
2232 
2233 	if (bitmap_weight(o2hb_quorum_region_bitmap,
2234 			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2235 		o2hb_region_pin(NULL);
2236 
2237 unlock:
2238 	spin_unlock(&o2hb_live_lock);
2239 }
2240 
2241 static ssize_t o2hb_heartbeat_group_threshold_show(struct config_item *item,
2242 		char *page)
2243 {
2244 	return sprintf(page, "%u\n", o2hb_dead_threshold);
2245 }
2246 
2247 static ssize_t o2hb_heartbeat_group_threshold_store(struct config_item *item,
2248 		const char *page, size_t count)
2249 {
2250 	unsigned long tmp;
2251 	char *p = (char *)page;
2252 
2253 	tmp = simple_strtoul(p, &p, 10);
2254 	if (!p || (*p && (*p != '\n')))
2255                 return -EINVAL;
2256 
2257 	/* this will validate ranges for us. */
2258 	o2hb_dead_threshold_set((unsigned int) tmp);
2259 
2260 	return count;
2261 }
2262 
2263 static ssize_t o2hb_heartbeat_group_mode_show(struct config_item *item,
2264 		char *page)
2265 {
2266 	return sprintf(page, "%s\n",
2267 		       o2hb_heartbeat_mode_desc[o2hb_heartbeat_mode]);
2268 }
2269 
2270 static ssize_t o2hb_heartbeat_group_mode_store(struct config_item *item,
2271 		const char *page, size_t count)
2272 {
2273 	unsigned int i;
2274 	int ret;
2275 	size_t len;
2276 
2277 	len = (page[count - 1] == '\n') ? count - 1 : count;
2278 	if (!len)
2279 		return -EINVAL;
2280 
2281 	for (i = 0; i < O2HB_HEARTBEAT_NUM_MODES; ++i) {
2282 		if (strncasecmp(page, o2hb_heartbeat_mode_desc[i], len))
2283 			continue;
2284 
2285 		ret = o2hb_global_heartbeat_mode_set(i);
2286 		if (!ret)
2287 			printk(KERN_NOTICE "o2hb: Heartbeat mode set to %s\n",
2288 			       o2hb_heartbeat_mode_desc[i]);
2289 		return count;
2290 	}
2291 
2292 	return -EINVAL;
2293 
2294 }
2295 
2296 CONFIGFS_ATTR(o2hb_heartbeat_group_, threshold);
2297 CONFIGFS_ATTR(o2hb_heartbeat_group_, mode);
2298 
2299 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
2300 	&o2hb_heartbeat_group_attr_threshold,
2301 	&o2hb_heartbeat_group_attr_mode,
2302 	NULL,
2303 };
2304 
2305 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
2306 	.make_item	= o2hb_heartbeat_group_make_item,
2307 	.drop_item	= o2hb_heartbeat_group_drop_item,
2308 };
2309 
2310 static struct config_item_type o2hb_heartbeat_group_type = {
2311 	.ct_group_ops	= &o2hb_heartbeat_group_group_ops,
2312 	.ct_attrs	= o2hb_heartbeat_group_attrs,
2313 	.ct_owner	= THIS_MODULE,
2314 };
2315 
2316 /* this is just here to avoid touching group in heartbeat.h which the
2317  * entire damn world #includes */
2318 struct config_group *o2hb_alloc_hb_set(void)
2319 {
2320 	struct o2hb_heartbeat_group *hs = NULL;
2321 	struct config_group *ret = NULL;
2322 
2323 	hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
2324 	if (hs == NULL)
2325 		goto out;
2326 
2327 	config_group_init_type_name(&hs->hs_group, "heartbeat",
2328 				    &o2hb_heartbeat_group_type);
2329 
2330 	ret = &hs->hs_group;
2331 out:
2332 	if (ret == NULL)
2333 		kfree(hs);
2334 	return ret;
2335 }
2336 
2337 void o2hb_free_hb_set(struct config_group *group)
2338 {
2339 	struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
2340 	kfree(hs);
2341 }
2342 
2343 /* hb callback registration and issuing */
2344 
2345 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
2346 {
2347 	if (type == O2HB_NUM_CB)
2348 		return ERR_PTR(-EINVAL);
2349 
2350 	return &o2hb_callbacks[type];
2351 }
2352 
2353 void o2hb_setup_callback(struct o2hb_callback_func *hc,
2354 			 enum o2hb_callback_type type,
2355 			 o2hb_cb_func *func,
2356 			 void *data,
2357 			 int priority)
2358 {
2359 	INIT_LIST_HEAD(&hc->hc_item);
2360 	hc->hc_func = func;
2361 	hc->hc_data = data;
2362 	hc->hc_priority = priority;
2363 	hc->hc_type = type;
2364 	hc->hc_magic = O2HB_CB_MAGIC;
2365 }
2366 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
2367 
2368 /*
2369  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2370  * In global heartbeat mode, region_uuid passed is NULL.
2371  *
2372  * In local, we only pin the matching region. In global we pin all the active
2373  * regions.
2374  */
2375 static int o2hb_region_pin(const char *region_uuid)
2376 {
2377 	int ret = 0, found = 0;
2378 	struct o2hb_region *reg;
2379 	char *uuid;
2380 
2381 	assert_spin_locked(&o2hb_live_lock);
2382 
2383 	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2384 		if (reg->hr_item_dropped)
2385 			continue;
2386 
2387 		uuid = config_item_name(&reg->hr_item);
2388 
2389 		/* local heartbeat */
2390 		if (region_uuid) {
2391 			if (strcmp(region_uuid, uuid))
2392 				continue;
2393 			found = 1;
2394 		}
2395 
2396 		if (reg->hr_item_pinned || reg->hr_item_dropped)
2397 			goto skip_pin;
2398 
2399 		/* Ignore ENOENT only for local hb (userdlm domain) */
2400 		ret = o2nm_depend_item(&reg->hr_item);
2401 		if (!ret) {
2402 			mlog(ML_CLUSTER, "Pin region %s\n", uuid);
2403 			reg->hr_item_pinned = 1;
2404 		} else {
2405 			if (ret == -ENOENT && found)
2406 				ret = 0;
2407 			else {
2408 				mlog(ML_ERROR, "Pin region %s fails with %d\n",
2409 				     uuid, ret);
2410 				break;
2411 			}
2412 		}
2413 skip_pin:
2414 		if (found)
2415 			break;
2416 	}
2417 
2418 	return ret;
2419 }
2420 
2421 /*
2422  * In local heartbeat mode, region_uuid passed matches the dlm domain name.
2423  * In global heartbeat mode, region_uuid passed is NULL.
2424  *
2425  * In local, we only unpin the matching region. In global we unpin all the
2426  * active regions.
2427  */
2428 static void o2hb_region_unpin(const char *region_uuid)
2429 {
2430 	struct o2hb_region *reg;
2431 	char *uuid;
2432 	int found = 0;
2433 
2434 	assert_spin_locked(&o2hb_live_lock);
2435 
2436 	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2437 		if (reg->hr_item_dropped)
2438 			continue;
2439 
2440 		uuid = config_item_name(&reg->hr_item);
2441 		if (region_uuid) {
2442 			if (strcmp(region_uuid, uuid))
2443 				continue;
2444 			found = 1;
2445 		}
2446 
2447 		if (reg->hr_item_pinned) {
2448 			mlog(ML_CLUSTER, "Unpin region %s\n", uuid);
2449 			o2nm_undepend_item(&reg->hr_item);
2450 			reg->hr_item_pinned = 0;
2451 		}
2452 		if (found)
2453 			break;
2454 	}
2455 }
2456 
2457 static int o2hb_region_inc_user(const char *region_uuid)
2458 {
2459 	int ret = 0;
2460 
2461 	spin_lock(&o2hb_live_lock);
2462 
2463 	/* local heartbeat */
2464 	if (!o2hb_global_heartbeat_active()) {
2465 	    ret = o2hb_region_pin(region_uuid);
2466 	    goto unlock;
2467 	}
2468 
2469 	/*
2470 	 * if global heartbeat active and this is the first dependent user,
2471 	 * pin all regions if quorum region count <= CUT_OFF
2472 	 */
2473 	o2hb_dependent_users++;
2474 	if (o2hb_dependent_users > 1)
2475 		goto unlock;
2476 
2477 	if (bitmap_weight(o2hb_quorum_region_bitmap,
2478 			   O2NM_MAX_REGIONS) <= O2HB_PIN_CUT_OFF)
2479 		ret = o2hb_region_pin(NULL);
2480 
2481 unlock:
2482 	spin_unlock(&o2hb_live_lock);
2483 	return ret;
2484 }
2485 
2486 void o2hb_region_dec_user(const char *region_uuid)
2487 {
2488 	spin_lock(&o2hb_live_lock);
2489 
2490 	/* local heartbeat */
2491 	if (!o2hb_global_heartbeat_active()) {
2492 	    o2hb_region_unpin(region_uuid);
2493 	    goto unlock;
2494 	}
2495 
2496 	/*
2497 	 * if global heartbeat active and there are no dependent users,
2498 	 * unpin all quorum regions
2499 	 */
2500 	o2hb_dependent_users--;
2501 	if (!o2hb_dependent_users)
2502 		o2hb_region_unpin(NULL);
2503 
2504 unlock:
2505 	spin_unlock(&o2hb_live_lock);
2506 }
2507 
2508 int o2hb_register_callback(const char *region_uuid,
2509 			   struct o2hb_callback_func *hc)
2510 {
2511 	struct o2hb_callback_func *f;
2512 	struct o2hb_callback *hbcall;
2513 	int ret;
2514 
2515 	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2516 	BUG_ON(!list_empty(&hc->hc_item));
2517 
2518 	hbcall = hbcall_from_type(hc->hc_type);
2519 	if (IS_ERR(hbcall)) {
2520 		ret = PTR_ERR(hbcall);
2521 		goto out;
2522 	}
2523 
2524 	if (region_uuid) {
2525 		ret = o2hb_region_inc_user(region_uuid);
2526 		if (ret) {
2527 			mlog_errno(ret);
2528 			goto out;
2529 		}
2530 	}
2531 
2532 	down_write(&o2hb_callback_sem);
2533 
2534 	list_for_each_entry(f, &hbcall->list, hc_item) {
2535 		if (hc->hc_priority < f->hc_priority) {
2536 			list_add_tail(&hc->hc_item, &f->hc_item);
2537 			break;
2538 		}
2539 	}
2540 	if (list_empty(&hc->hc_item))
2541 		list_add_tail(&hc->hc_item, &hbcall->list);
2542 
2543 	up_write(&o2hb_callback_sem);
2544 	ret = 0;
2545 out:
2546 	mlog(ML_CLUSTER, "returning %d on behalf of %p for funcs %p\n",
2547 	     ret, __builtin_return_address(0), hc);
2548 	return ret;
2549 }
2550 EXPORT_SYMBOL_GPL(o2hb_register_callback);
2551 
2552 void o2hb_unregister_callback(const char *region_uuid,
2553 			      struct o2hb_callback_func *hc)
2554 {
2555 	BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
2556 
2557 	mlog(ML_CLUSTER, "on behalf of %p for funcs %p\n",
2558 	     __builtin_return_address(0), hc);
2559 
2560 	/* XXX Can this happen _with_ a region reference? */
2561 	if (list_empty(&hc->hc_item))
2562 		return;
2563 
2564 	if (region_uuid)
2565 		o2hb_region_dec_user(region_uuid);
2566 
2567 	down_write(&o2hb_callback_sem);
2568 
2569 	list_del_init(&hc->hc_item);
2570 
2571 	up_write(&o2hb_callback_sem);
2572 }
2573 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
2574 
2575 int o2hb_check_node_heartbeating(u8 node_num)
2576 {
2577 	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2578 
2579 	o2hb_fill_node_map(testing_map, sizeof(testing_map));
2580 	if (!test_bit(node_num, testing_map)) {
2581 		mlog(ML_HEARTBEAT,
2582 		     "node (%u) does not have heartbeating enabled.\n",
2583 		     node_num);
2584 		return 0;
2585 	}
2586 
2587 	return 1;
2588 }
2589 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
2590 
2591 int o2hb_check_node_heartbeating_no_sem(u8 node_num)
2592 {
2593 	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2594 
2595 	spin_lock(&o2hb_live_lock);
2596 	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2597 	spin_unlock(&o2hb_live_lock);
2598 	if (!test_bit(node_num, testing_map)) {
2599 		mlog(ML_HEARTBEAT,
2600 		     "node (%u) does not have heartbeating enabled.\n",
2601 		     node_num);
2602 		return 0;
2603 	}
2604 
2605 	return 1;
2606 }
2607 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_no_sem);
2608 
2609 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
2610 {
2611 	unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
2612 
2613 	o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
2614 	if (!test_bit(node_num, testing_map)) {
2615 		mlog(ML_HEARTBEAT,
2616 		     "node (%u) does not have heartbeating enabled.\n",
2617 		     node_num);
2618 		return 0;
2619 	}
2620 
2621 	return 1;
2622 }
2623 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
2624 
2625 /* Makes sure our local node is configured with a node number, and is
2626  * heartbeating. */
2627 int o2hb_check_local_node_heartbeating(void)
2628 {
2629 	u8 node_num;
2630 
2631 	/* if this node was set then we have networking */
2632 	node_num = o2nm_this_node();
2633 	if (node_num == O2NM_MAX_NODES) {
2634 		mlog(ML_HEARTBEAT, "this node has not been configured.\n");
2635 		return 0;
2636 	}
2637 
2638 	return o2hb_check_node_heartbeating(node_num);
2639 }
2640 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
2641 
2642 /*
2643  * this is just a hack until we get the plumbing which flips file systems
2644  * read only and drops the hb ref instead of killing the node dead.
2645  */
2646 void o2hb_stop_all_regions(void)
2647 {
2648 	struct o2hb_region *reg;
2649 
2650 	mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
2651 
2652 	spin_lock(&o2hb_live_lock);
2653 
2654 	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
2655 		reg->hr_unclean_stop = 1;
2656 
2657 	spin_unlock(&o2hb_live_lock);
2658 }
2659 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);
2660 
2661 int o2hb_get_all_regions(char *region_uuids, u8 max_regions)
2662 {
2663 	struct o2hb_region *reg;
2664 	int numregs = 0;
2665 	char *p;
2666 
2667 	spin_lock(&o2hb_live_lock);
2668 
2669 	p = region_uuids;
2670 	list_for_each_entry(reg, &o2hb_all_regions, hr_all_item) {
2671 		if (reg->hr_item_dropped)
2672 			continue;
2673 
2674 		mlog(0, "Region: %s\n", config_item_name(&reg->hr_item));
2675 		if (numregs < max_regions) {
2676 			memcpy(p, config_item_name(&reg->hr_item),
2677 			       O2HB_MAX_REGION_NAME_LEN);
2678 			p += O2HB_MAX_REGION_NAME_LEN;
2679 		}
2680 		numregs++;
2681 	}
2682 
2683 	spin_unlock(&o2hb_live_lock);
2684 
2685 	return numregs;
2686 }
2687 EXPORT_SYMBOL_GPL(o2hb_get_all_regions);
2688 
2689 int o2hb_global_heartbeat_active(void)
2690 {
2691 	return (o2hb_heartbeat_mode == O2HB_HEARTBEAT_GLOBAL);
2692 }
2693 EXPORT_SYMBOL(o2hb_global_heartbeat_active);
2694