Merge branch 'release' of git://git.kernel.org/pub/scm/linux/kernel/git/aegl/linux-2.6
[linux-2.6] / fs / ocfs2 / cluster / heartbeat.c
1 /* -*- mode: c; c-basic-offset: 8; -*-
2  * vim: noexpandtab sw=8 ts=8 sts=0:
3  *
4  * Copyright (C) 2004, 2005 Oracle.  All rights reserved.
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public
17  * License along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA.
20  */
21
22 #include <linux/kernel.h>
23 #include <linux/sched.h>
24 #include <linux/jiffies.h>
25 #include <linux/module.h>
26 #include <linux/fs.h>
27 #include <linux/bio.h>
28 #include <linux/blkdev.h>
29 #include <linux/delay.h>
30 #include <linux/file.h>
31 #include <linux/kthread.h>
32 #include <linux/configfs.h>
33 #include <linux/random.h>
34 #include <linux/crc32.h>
35 #include <linux/time.h>
36
37 #include "heartbeat.h"
38 #include "tcp.h"
39 #include "nodemanager.h"
40 #include "quorum.h"
41
42 #include "masklog.h"
43
44
45 /*
46  * The first heartbeat pass had one global thread that would serialize all hb
47  * callback calls.  This global serializing sem should only be removed once
48  * we've made sure that all callees can deal with being called concurrently
49  * from multiple hb region threads.
50  */
51 static DECLARE_RWSEM(o2hb_callback_sem);
52
53 /*
54  * multiple hb threads are watching multiple regions.  A node is live
55  * whenever any of the threads sees activity from the node in its region.
56  */
57 static DEFINE_SPINLOCK(o2hb_live_lock);
58 static struct list_head o2hb_live_slots[O2NM_MAX_NODES];
59 static unsigned long o2hb_live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
60 static LIST_HEAD(o2hb_node_events);
61 static DECLARE_WAIT_QUEUE_HEAD(o2hb_steady_queue);
62
63 static LIST_HEAD(o2hb_all_regions);
64
65 static struct o2hb_callback {
66         struct list_head list;
67 } o2hb_callbacks[O2HB_NUM_CB];
68
69 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type);
70
71 #define O2HB_DEFAULT_BLOCK_BITS       9
72
73 unsigned int o2hb_dead_threshold = O2HB_DEFAULT_DEAD_THRESHOLD;
74
75 /* Only sets a new threshold if there are no active regions. 
76  *
77  * No locking or otherwise interesting code is required for reading
78  * o2hb_dead_threshold as it can't change once regions are active and
79  * it's not interesting to anyone until then anyway. */
80 static void o2hb_dead_threshold_set(unsigned int threshold)
81 {
82         if (threshold > O2HB_MIN_DEAD_THRESHOLD) {
83                 spin_lock(&o2hb_live_lock);
84                 if (list_empty(&o2hb_all_regions))
85                         o2hb_dead_threshold = threshold;
86                 spin_unlock(&o2hb_live_lock);
87         }
88 }
89
90 struct o2hb_node_event {
91         struct list_head        hn_item;
92         enum o2hb_callback_type hn_event_type;
93         struct o2nm_node        *hn_node;
94         int                     hn_node_num;
95 };
96
97 struct o2hb_disk_slot {
98         struct o2hb_disk_heartbeat_block *ds_raw_block;
99         u8                      ds_node_num;
100         u64                     ds_last_time;
101         u64                     ds_last_generation;
102         u16                     ds_equal_samples;
103         u16                     ds_changed_samples;
104         struct list_head        ds_live_item;
105 };
106
107 /* each thread owns a region.. when we're asked to tear down the region
108  * we ask the thread to stop, who cleans up the region */
109 struct o2hb_region {
110         struct config_item      hr_item;
111
112         struct list_head        hr_all_item;
113         unsigned                hr_unclean_stop:1;
114
115         /* protected by the hr_callback_sem */
116         struct task_struct      *hr_task;
117
118         unsigned int            hr_blocks;
119         unsigned long long      hr_start_block;
120
121         unsigned int            hr_block_bits;
122         unsigned int            hr_block_bytes;
123
124         unsigned int            hr_slots_per_page;
125         unsigned int            hr_num_pages;
126
127         struct page             **hr_slot_data;
128         struct block_device     *hr_bdev;
129         struct o2hb_disk_slot   *hr_slots;
130
131         /* let the person setting up hb wait for it to return until it
132          * has reached a 'steady' state.  This will be fixed when we have
133          * a more complete api that doesn't lead to this sort of fragility. */
134         atomic_t                hr_steady_iterations;
135
136         char                    hr_dev_name[BDEVNAME_SIZE];
137
138         unsigned int            hr_timeout_ms;
139
140         /* randomized as the region goes up and down so that a node
141          * recognizes a node going up and down in one iteration */
142         u64                     hr_generation;
143
144         struct delayed_work     hr_write_timeout_work;
145         unsigned long           hr_last_timeout_start;
146
147         /* Used during o2hb_check_slot to hold a copy of the block
148          * being checked because we temporarily have to zero out the
149          * crc field. */
150         struct o2hb_disk_heartbeat_block *hr_tmp_block;
151 };
152
153 struct o2hb_bio_wait_ctxt {
154         atomic_t          wc_num_reqs;
155         struct completion wc_io_complete;
156         int               wc_error;
157 };
158
159 static void o2hb_write_timeout(struct work_struct *work)
160 {
161         struct o2hb_region *reg =
162                 container_of(work, struct o2hb_region,
163                              hr_write_timeout_work.work);
164
165         mlog(ML_ERROR, "Heartbeat write timeout to device %s after %u "
166              "milliseconds\n", reg->hr_dev_name,
167              jiffies_to_msecs(jiffies - reg->hr_last_timeout_start)); 
168         o2quo_disk_timeout();
169 }
170
171 static void o2hb_arm_write_timeout(struct o2hb_region *reg)
172 {
173         mlog(0, "Queue write timeout for %u ms\n", O2HB_MAX_WRITE_TIMEOUT_MS);
174
175         cancel_delayed_work(&reg->hr_write_timeout_work);
176         reg->hr_last_timeout_start = jiffies;
177         schedule_delayed_work(&reg->hr_write_timeout_work,
178                               msecs_to_jiffies(O2HB_MAX_WRITE_TIMEOUT_MS));
179 }
180
181 static void o2hb_disarm_write_timeout(struct o2hb_region *reg)
182 {
183         cancel_delayed_work(&reg->hr_write_timeout_work);
184         flush_scheduled_work();
185 }
186
187 static inline void o2hb_bio_wait_init(struct o2hb_bio_wait_ctxt *wc)
188 {
189         atomic_set(&wc->wc_num_reqs, 1);
190         init_completion(&wc->wc_io_complete);
191         wc->wc_error = 0;
192 }
193
194 /* Used in error paths too */
195 static inline void o2hb_bio_wait_dec(struct o2hb_bio_wait_ctxt *wc,
196                                      unsigned int num)
197 {
198         /* sadly atomic_sub_and_test() isn't available on all platforms.  The
199          * good news is that the fast path only completes one at a time */
200         while(num--) {
201                 if (atomic_dec_and_test(&wc->wc_num_reqs)) {
202                         BUG_ON(num > 0);
203                         complete(&wc->wc_io_complete);
204                 }
205         }
206 }
207
208 static void o2hb_wait_on_io(struct o2hb_region *reg,
209                             struct o2hb_bio_wait_ctxt *wc)
210 {
211         struct address_space *mapping = reg->hr_bdev->bd_inode->i_mapping;
212
213         blk_run_address_space(mapping);
214         o2hb_bio_wait_dec(wc, 1);
215
216         wait_for_completion(&wc->wc_io_complete);
217 }
218
219 static int o2hb_bio_end_io(struct bio *bio,
220                            unsigned int bytes_done,
221                            int error)
222 {
223         struct o2hb_bio_wait_ctxt *wc = bio->bi_private;
224
225         if (error) {
226                 mlog(ML_ERROR, "IO Error %d\n", error);
227                 wc->wc_error = error;
228         }
229
230         if (bio->bi_size)
231                 return 1;
232
233         o2hb_bio_wait_dec(wc, 1);
234         bio_put(bio);
235         return 0;
236 }
237
238 /* Setup a Bio to cover I/O against num_slots slots starting at
239  * start_slot. */
240 static struct bio *o2hb_setup_one_bio(struct o2hb_region *reg,
241                                       struct o2hb_bio_wait_ctxt *wc,
242                                       unsigned int *current_slot,
243                                       unsigned int max_slots)
244 {
245         int len, current_page;
246         unsigned int vec_len, vec_start;
247         unsigned int bits = reg->hr_block_bits;
248         unsigned int spp = reg->hr_slots_per_page;
249         unsigned int cs = *current_slot;
250         struct bio *bio;
251         struct page *page;
252
253         /* Testing has shown this allocation to take long enough under
254          * GFP_KERNEL that the local node can get fenced. It would be
255          * nicest if we could pre-allocate these bios and avoid this
256          * all together. */
257         bio = bio_alloc(GFP_ATOMIC, 16);
258         if (!bio) {
259                 mlog(ML_ERROR, "Could not alloc slots BIO!\n");
260                 bio = ERR_PTR(-ENOMEM);
261                 goto bail;
262         }
263
264         /* Must put everything in 512 byte sectors for the bio... */
265         bio->bi_sector = (reg->hr_start_block + cs) << (bits - 9);
266         bio->bi_bdev = reg->hr_bdev;
267         bio->bi_private = wc;
268         bio->bi_end_io = o2hb_bio_end_io;
269
270         vec_start = (cs << bits) % PAGE_CACHE_SIZE;
271         while(cs < max_slots) {
272                 current_page = cs / spp;
273                 page = reg->hr_slot_data[current_page];
274
275                 vec_len = min(PAGE_CACHE_SIZE,
276                               (max_slots-cs) * (PAGE_CACHE_SIZE/spp) );
277
278                 mlog(ML_HB_BIO, "page %d, vec_len = %u, vec_start = %u\n",
279                      current_page, vec_len, vec_start);
280
281                 len = bio_add_page(bio, page, vec_len, vec_start);
282                 if (len != vec_len) break;
283
284                 cs += vec_len / (PAGE_CACHE_SIZE/spp);
285                 vec_start = 0;
286         }
287
288 bail:
289         *current_slot = cs;
290         return bio;
291 }
292
293 static int o2hb_read_slots(struct o2hb_region *reg,
294                            unsigned int max_slots)
295 {
296         unsigned int current_slot=0;
297         int status;
298         struct o2hb_bio_wait_ctxt wc;
299         struct bio *bio;
300
301         o2hb_bio_wait_init(&wc);
302
303         while(current_slot < max_slots) {
304                 bio = o2hb_setup_one_bio(reg, &wc, &current_slot, max_slots);
305                 if (IS_ERR(bio)) {
306                         status = PTR_ERR(bio);
307                         mlog_errno(status);
308                         goto bail_and_wait;
309                 }
310
311                 atomic_inc(&wc.wc_num_reqs);
312                 submit_bio(READ, bio);
313         }
314
315         status = 0;
316
317 bail_and_wait:
318         o2hb_wait_on_io(reg, &wc);
319         if (wc.wc_error && !status)
320                 status = wc.wc_error;
321
322         return status;
323 }
324
325 static int o2hb_issue_node_write(struct o2hb_region *reg,
326                                  struct o2hb_bio_wait_ctxt *write_wc)
327 {
328         int status;
329         unsigned int slot;
330         struct bio *bio;
331
332         o2hb_bio_wait_init(write_wc);
333
334         slot = o2nm_this_node();
335
336         bio = o2hb_setup_one_bio(reg, write_wc, &slot, slot+1);
337         if (IS_ERR(bio)) {
338                 status = PTR_ERR(bio);
339                 mlog_errno(status);
340                 goto bail;
341         }
342
343         atomic_inc(&write_wc->wc_num_reqs);
344         submit_bio(WRITE, bio);
345
346         status = 0;
347 bail:
348         return status;
349 }
350
351 static u32 o2hb_compute_block_crc_le(struct o2hb_region *reg,
352                                      struct o2hb_disk_heartbeat_block *hb_block)
353 {
354         __le32 old_cksum;
355         u32 ret;
356
357         /* We want to compute the block crc with a 0 value in the
358          * hb_cksum field. Save it off here and replace after the
359          * crc. */
360         old_cksum = hb_block->hb_cksum;
361         hb_block->hb_cksum = 0;
362
363         ret = crc32_le(0, (unsigned char *) hb_block, reg->hr_block_bytes);
364
365         hb_block->hb_cksum = old_cksum;
366
367         return ret;
368 }
369
370 static void o2hb_dump_slot(struct o2hb_disk_heartbeat_block *hb_block)
371 {
372         mlog(ML_ERROR, "Dump slot information: seq = 0x%llx, node = %u, "
373              "cksum = 0x%x, generation 0x%llx\n",
374              (long long)le64_to_cpu(hb_block->hb_seq),
375              hb_block->hb_node, le32_to_cpu(hb_block->hb_cksum),
376              (long long)le64_to_cpu(hb_block->hb_generation));
377 }
378
379 static int o2hb_verify_crc(struct o2hb_region *reg,
380                            struct o2hb_disk_heartbeat_block *hb_block)
381 {
382         u32 read, computed;
383
384         read = le32_to_cpu(hb_block->hb_cksum);
385         computed = o2hb_compute_block_crc_le(reg, hb_block);
386
387         return read == computed;
388 }
389
390 /* We want to make sure that nobody is heartbeating on top of us --
391  * this will help detect an invalid configuration. */
392 static int o2hb_check_last_timestamp(struct o2hb_region *reg)
393 {
394         int node_num, ret;
395         struct o2hb_disk_slot *slot;
396         struct o2hb_disk_heartbeat_block *hb_block;
397
398         node_num = o2nm_this_node();
399
400         ret = 1;
401         slot = &reg->hr_slots[node_num];
402         /* Don't check on our 1st timestamp */
403         if (slot->ds_last_time) {
404                 hb_block = slot->ds_raw_block;
405
406                 if (le64_to_cpu(hb_block->hb_seq) != slot->ds_last_time)
407                         ret = 0;
408         }
409
410         return ret;
411 }
412
413 static inline void o2hb_prepare_block(struct o2hb_region *reg,
414                                       u64 generation)
415 {
416         int node_num;
417         u64 cputime;
418         struct o2hb_disk_slot *slot;
419         struct o2hb_disk_heartbeat_block *hb_block;
420
421         node_num = o2nm_this_node();
422         slot = &reg->hr_slots[node_num];
423
424         hb_block = (struct o2hb_disk_heartbeat_block *)slot->ds_raw_block;
425         memset(hb_block, 0, reg->hr_block_bytes);
426         /* TODO: time stuff */
427         cputime = CURRENT_TIME.tv_sec;
428         if (!cputime)
429                 cputime = 1;
430
431         hb_block->hb_seq = cpu_to_le64(cputime);
432         hb_block->hb_node = node_num;
433         hb_block->hb_generation = cpu_to_le64(generation);
434         hb_block->hb_dead_ms = cpu_to_le32(o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS);
435
436         /* This step must always happen last! */
437         hb_block->hb_cksum = cpu_to_le32(o2hb_compute_block_crc_le(reg,
438                                                                    hb_block));
439
440         mlog(ML_HB_BIO, "our node generation = 0x%llx, cksum = 0x%x\n",
441              (long long)generation,
442              le32_to_cpu(hb_block->hb_cksum));
443 }
444
445 static void o2hb_fire_callbacks(struct o2hb_callback *hbcall,
446                                 struct o2nm_node *node,
447                                 int idx)
448 {
449         struct list_head *iter;
450         struct o2hb_callback_func *f;
451
452         list_for_each(iter, &hbcall->list) {
453                 f = list_entry(iter, struct o2hb_callback_func, hc_item);
454                 mlog(ML_HEARTBEAT, "calling funcs %p\n", f);
455                 (f->hc_func)(node, idx, f->hc_data);
456         }
457 }
458
459 /* Will run the list in order until we process the passed event */
460 static void o2hb_run_event_list(struct o2hb_node_event *queued_event)
461 {
462         int empty;
463         struct o2hb_callback *hbcall;
464         struct o2hb_node_event *event;
465
466         spin_lock(&o2hb_live_lock);
467         empty = list_empty(&queued_event->hn_item);
468         spin_unlock(&o2hb_live_lock);
469         if (empty)
470                 return;
471
472         /* Holding callback sem assures we don't alter the callback
473          * lists when doing this, and serializes ourselves with other
474          * processes wanting callbacks. */
475         down_write(&o2hb_callback_sem);
476
477         spin_lock(&o2hb_live_lock);
478         while (!list_empty(&o2hb_node_events)
479                && !list_empty(&queued_event->hn_item)) {
480                 event = list_entry(o2hb_node_events.next,
481                                    struct o2hb_node_event,
482                                    hn_item);
483                 list_del_init(&event->hn_item);
484                 spin_unlock(&o2hb_live_lock);
485
486                 mlog(ML_HEARTBEAT, "Node %s event for %d\n",
487                      event->hn_event_type == O2HB_NODE_UP_CB ? "UP" : "DOWN",
488                      event->hn_node_num);
489
490                 hbcall = hbcall_from_type(event->hn_event_type);
491
492                 /* We should *never* have gotten on to the list with a
493                  * bad type... This isn't something that we should try
494                  * to recover from. */
495                 BUG_ON(IS_ERR(hbcall));
496
497                 o2hb_fire_callbacks(hbcall, event->hn_node, event->hn_node_num);
498
499                 spin_lock(&o2hb_live_lock);
500         }
501         spin_unlock(&o2hb_live_lock);
502
503         up_write(&o2hb_callback_sem);
504 }
505
506 static void o2hb_queue_node_event(struct o2hb_node_event *event,
507                                   enum o2hb_callback_type type,
508                                   struct o2nm_node *node,
509                                   int node_num)
510 {
511         assert_spin_locked(&o2hb_live_lock);
512
513         event->hn_event_type = type;
514         event->hn_node = node;
515         event->hn_node_num = node_num;
516
517         mlog(ML_HEARTBEAT, "Queue node %s event for node %d\n",
518              type == O2HB_NODE_UP_CB ? "UP" : "DOWN", node_num);
519
520         list_add_tail(&event->hn_item, &o2hb_node_events);
521 }
522
523 static void o2hb_shutdown_slot(struct o2hb_disk_slot *slot)
524 {
525         struct o2hb_node_event event =
526                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
527         struct o2nm_node *node;
528
529         node = o2nm_get_node_by_num(slot->ds_node_num);
530         if (!node)
531                 return;
532
533         spin_lock(&o2hb_live_lock);
534         if (!list_empty(&slot->ds_live_item)) {
535                 mlog(ML_HEARTBEAT, "Shutdown, node %d leaves region\n",
536                      slot->ds_node_num);
537
538                 list_del_init(&slot->ds_live_item);
539
540                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
541                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
542
543                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
544                                               slot->ds_node_num);
545                 }
546         }
547         spin_unlock(&o2hb_live_lock);
548
549         o2hb_run_event_list(&event);
550
551         o2nm_node_put(node);
552 }
553
554 static int o2hb_check_slot(struct o2hb_region *reg,
555                            struct o2hb_disk_slot *slot)
556 {
557         int changed = 0, gen_changed = 0;
558         struct o2hb_node_event event =
559                 { .hn_item = LIST_HEAD_INIT(event.hn_item), };
560         struct o2nm_node *node;
561         struct o2hb_disk_heartbeat_block *hb_block = reg->hr_tmp_block;
562         u64 cputime;
563         unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
564         unsigned int slot_dead_ms;
565
566         memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
567
568         /* Is this correct? Do we assume that the node doesn't exist
569          * if we're not configured for him? */
570         node = o2nm_get_node_by_num(slot->ds_node_num);
571         if (!node)
572                 return 0;
573
574         if (!o2hb_verify_crc(reg, hb_block)) {
575                 /* all paths from here will drop o2hb_live_lock for
576                  * us. */
577                 spin_lock(&o2hb_live_lock);
578
579                 /* Don't print an error on the console in this case -
580                  * a freshly formatted heartbeat area will not have a
581                  * crc set on it. */
582                 if (list_empty(&slot->ds_live_item))
583                         goto out;
584
585                 /* The node is live but pushed out a bad crc. We
586                  * consider it a transient miss but don't populate any
587                  * other values as they may be junk. */
588                 mlog(ML_ERROR, "Node %d has written a bad crc to %s\n",
589                      slot->ds_node_num, reg->hr_dev_name);
590                 o2hb_dump_slot(hb_block);
591
592                 slot->ds_equal_samples++;
593                 goto fire_callbacks;
594         }
595
596         /* we don't care if these wrap.. the state transitions below
597          * clear at the right places */
598         cputime = le64_to_cpu(hb_block->hb_seq);
599         if (slot->ds_last_time != cputime)
600                 slot->ds_changed_samples++;
601         else
602                 slot->ds_equal_samples++;
603         slot->ds_last_time = cputime;
604
605         /* The node changed heartbeat generations. We assume this to
606          * mean it dropped off but came back before we timed out. We
607          * want to consider it down for the time being but don't want
608          * to lose any changed_samples state we might build up to
609          * considering it live again. */
610         if (slot->ds_last_generation != le64_to_cpu(hb_block->hb_generation)) {
611                 gen_changed = 1;
612                 slot->ds_equal_samples = 0;
613                 mlog(ML_HEARTBEAT, "Node %d changed generation (0x%llx "
614                      "to 0x%llx)\n", slot->ds_node_num,
615                      (long long)slot->ds_last_generation,
616                      (long long)le64_to_cpu(hb_block->hb_generation));
617         }
618
619         slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
620
621         mlog(ML_HEARTBEAT, "Slot %d gen 0x%llx cksum 0x%x "
622              "seq %llu last %llu changed %u equal %u\n",
623              slot->ds_node_num, (long long)slot->ds_last_generation,
624              le32_to_cpu(hb_block->hb_cksum),
625              (unsigned long long)le64_to_cpu(hb_block->hb_seq), 
626              (unsigned long long)slot->ds_last_time, slot->ds_changed_samples,
627              slot->ds_equal_samples);
628
629         spin_lock(&o2hb_live_lock);
630
631 fire_callbacks:
632         /* dead nodes only come to life after some number of
633          * changes at any time during their dead time */
634         if (list_empty(&slot->ds_live_item) &&
635             slot->ds_changed_samples >= O2HB_LIVE_THRESHOLD) {
636                 mlog(ML_HEARTBEAT, "Node %d (id 0x%llx) joined my region\n",
637                      slot->ds_node_num, (long long)slot->ds_last_generation);
638
639                 /* first on the list generates a callback */
640                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
641                         set_bit(slot->ds_node_num, o2hb_live_node_bitmap);
642
643                         o2hb_queue_node_event(&event, O2HB_NODE_UP_CB, node,
644                                               slot->ds_node_num);
645
646                         changed = 1;
647                 }
648
649                 list_add_tail(&slot->ds_live_item,
650                               &o2hb_live_slots[slot->ds_node_num]);
651
652                 slot->ds_equal_samples = 0;
653
654                 /* We want to be sure that all nodes agree on the
655                  * number of milliseconds before a node will be
656                  * considered dead. The self-fencing timeout is
657                  * computed from this value, and a discrepancy might
658                  * result in heartbeat calling a node dead when it
659                  * hasn't self-fenced yet. */
660                 slot_dead_ms = le32_to_cpu(hb_block->hb_dead_ms);
661                 if (slot_dead_ms && slot_dead_ms != dead_ms) {
662                         /* TODO: Perhaps we can fail the region here. */
663                         mlog(ML_ERROR, "Node %d on device %s has a dead count "
664                              "of %u ms, but our count is %u ms.\n"
665                              "Please double check your configuration values "
666                              "for 'O2CB_HEARTBEAT_THRESHOLD'\n",
667                              slot->ds_node_num, reg->hr_dev_name, slot_dead_ms,
668                              dead_ms);
669                 }
670                 goto out;
671         }
672
673         /* if the list is dead, we're done.. */
674         if (list_empty(&slot->ds_live_item))
675                 goto out;
676
677         /* live nodes only go dead after enough consequtive missed
678          * samples..  reset the missed counter whenever we see
679          * activity */
680         if (slot->ds_equal_samples >= o2hb_dead_threshold || gen_changed) {
681                 mlog(ML_HEARTBEAT, "Node %d left my region\n",
682                      slot->ds_node_num);
683
684                 /* last off the live_slot generates a callback */
685                 list_del_init(&slot->ds_live_item);
686                 if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
687                         clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
688
689                         o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
690                                               slot->ds_node_num);
691
692                         changed = 1;
693                 }
694
695                 /* We don't clear this because the node is still
696                  * actually writing new blocks. */
697                 if (!gen_changed)
698                         slot->ds_changed_samples = 0;
699                 goto out;
700         }
701         if (slot->ds_changed_samples) {
702                 slot->ds_changed_samples = 0;
703                 slot->ds_equal_samples = 0;
704         }
705 out:
706         spin_unlock(&o2hb_live_lock);
707
708         o2hb_run_event_list(&event);
709
710         o2nm_node_put(node);
711         return changed;
712 }
713
714 /* This could be faster if we just implmented a find_last_bit, but I
715  * don't think the circumstances warrant it. */
716 static int o2hb_highest_node(unsigned long *nodes,
717                              int numbits)
718 {
719         int highest, node;
720
721         highest = numbits;
722         node = -1;
723         while ((node = find_next_bit(nodes, numbits, node + 1)) != -1) {
724                 if (node >= numbits)
725                         break;
726
727                 highest = node;
728         }
729
730         return highest;
731 }
732
733 static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
734 {
735         int i, ret, highest_node, change = 0;
736         unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
737         struct o2hb_bio_wait_ctxt write_wc;
738
739         ret = o2nm_configured_node_map(configured_nodes,
740                                        sizeof(configured_nodes));
741         if (ret) {
742                 mlog_errno(ret);
743                 return ret;
744         }
745
746         highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
747         if (highest_node >= O2NM_MAX_NODES) {
748                 mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
749                 return -EINVAL;
750         }
751
752         /* No sense in reading the slots of nodes that don't exist
753          * yet. Of course, if the node definitions have holes in them
754          * then we're reading an empty slot anyway... Consider this
755          * best-effort. */
756         ret = o2hb_read_slots(reg, highest_node + 1);
757         if (ret < 0) {
758                 mlog_errno(ret);
759                 return ret;
760         }
761
762         /* With an up to date view of the slots, we can check that no
763          * other node has been improperly configured to heartbeat in
764          * our slot. */
765         if (!o2hb_check_last_timestamp(reg))
766                 mlog(ML_ERROR, "Device \"%s\": another node is heartbeating "
767                      "in our slot!\n", reg->hr_dev_name);
768
769         /* fill in the proper info for our next heartbeat */
770         o2hb_prepare_block(reg, reg->hr_generation);
771
772         /* And fire off the write. Note that we don't wait on this I/O
773          * until later. */
774         ret = o2hb_issue_node_write(reg, &write_wc);
775         if (ret < 0) {
776                 mlog_errno(ret);
777                 return ret;
778         }
779
780         i = -1;
781         while((i = find_next_bit(configured_nodes, O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
782
783                 change |= o2hb_check_slot(reg, &reg->hr_slots[i]);
784         }
785
786         /*
787          * We have to be sure we've advertised ourselves on disk
788          * before we can go to steady state.  This ensures that
789          * people we find in our steady state have seen us.
790          */
791         o2hb_wait_on_io(reg, &write_wc);
792         if (write_wc.wc_error) {
793                 /* Do not re-arm the write timeout on I/O error - we
794                  * can't be sure that the new block ever made it to
795                  * disk */
796                 mlog(ML_ERROR, "Write error %d on device \"%s\"\n",
797                      write_wc.wc_error, reg->hr_dev_name);
798                 return write_wc.wc_error;
799         }
800
801         o2hb_arm_write_timeout(reg);
802
803         /* let the person who launched us know when things are steady */
804         if (!change && (atomic_read(&reg->hr_steady_iterations) != 0)) {
805                 if (atomic_dec_and_test(&reg->hr_steady_iterations))
806                         wake_up(&o2hb_steady_queue);
807         }
808
809         return 0;
810 }
811
812 /* Subtract b from a, storing the result in a. a *must* have a larger
813  * value than b. */
814 static void o2hb_tv_subtract(struct timeval *a,
815                              struct timeval *b)
816 {
817         /* just return 0 when a is after b */
818         if (a->tv_sec < b->tv_sec ||
819             (a->tv_sec == b->tv_sec && a->tv_usec < b->tv_usec)) {
820                 a->tv_sec = 0;
821                 a->tv_usec = 0;
822                 return;
823         }
824
825         a->tv_sec -= b->tv_sec;
826         a->tv_usec -= b->tv_usec;
827         while ( a->tv_usec < 0 ) {
828                 a->tv_sec--;
829                 a->tv_usec += 1000000;
830         }
831 }
832
833 static unsigned int o2hb_elapsed_msecs(struct timeval *start,
834                                        struct timeval *end)
835 {
836         struct timeval res = *end;
837
838         o2hb_tv_subtract(&res, start);
839
840         return res.tv_sec * 1000 + res.tv_usec / 1000;
841 }
842
843 /*
844  * we ride the region ref that the region dir holds.  before the region
845  * dir is removed and drops it ref it will wait to tear down this
846  * thread.
847  */
848 static int o2hb_thread(void *data)
849 {
850         int i, ret;
851         struct o2hb_region *reg = data;
852         struct o2hb_bio_wait_ctxt write_wc;
853         struct timeval before_hb, after_hb;
854         unsigned int elapsed_msec;
855
856         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread running\n");
857
858         set_user_nice(current, -20);
859
860         while (!kthread_should_stop() && !reg->hr_unclean_stop) {
861                 /* We track the time spent inside
862                  * o2hb_do_disk_heartbeat so that we avoid more then
863                  * hr_timeout_ms between disk writes. On busy systems
864                  * this should result in a heartbeat which is less
865                  * likely to time itself out. */
866                 do_gettimeofday(&before_hb);
867
868                 i = 0;
869                 do {
870                         ret = o2hb_do_disk_heartbeat(reg);
871                 } while (ret && ++i < 2);
872
873                 do_gettimeofday(&after_hb);
874                 elapsed_msec = o2hb_elapsed_msecs(&before_hb, &after_hb);
875
876                 mlog(0, "start = %lu.%lu, end = %lu.%lu, msec = %u\n",
877                      before_hb.tv_sec, (unsigned long) before_hb.tv_usec,
878                      after_hb.tv_sec, (unsigned long) after_hb.tv_usec,
879                      elapsed_msec);
880
881                 if (elapsed_msec < reg->hr_timeout_ms) {
882                         /* the kthread api has blocked signals for us so no
883                          * need to record the return value. */
884                         msleep_interruptible(reg->hr_timeout_ms - elapsed_msec);
885                 }
886         }
887
888         o2hb_disarm_write_timeout(reg);
889
890         /* unclean stop is only used in very bad situation */
891         for(i = 0; !reg->hr_unclean_stop && i < reg->hr_blocks; i++)
892                 o2hb_shutdown_slot(&reg->hr_slots[i]);
893
894         /* Explicit down notification - avoid forcing the other nodes
895          * to timeout on this region when we could just as easily
896          * write a clear generation - thus indicating to them that
897          * this node has left this region.
898          *
899          * XXX: Should we skip this on unclean_stop? */
900         o2hb_prepare_block(reg, 0);
901         ret = o2hb_issue_node_write(reg, &write_wc);
902         if (ret == 0) {
903                 o2hb_wait_on_io(reg, &write_wc);
904         } else {
905                 mlog_errno(ret);
906         }
907
908         mlog(ML_HEARTBEAT|ML_KTHREAD, "hb thread exiting\n");
909
910         return 0;
911 }
912
913 void o2hb_init(void)
914 {
915         int i;
916
917         for (i = 0; i < ARRAY_SIZE(o2hb_callbacks); i++)
918                 INIT_LIST_HEAD(&o2hb_callbacks[i].list);
919
920         for (i = 0; i < ARRAY_SIZE(o2hb_live_slots); i++)
921                 INIT_LIST_HEAD(&o2hb_live_slots[i]);
922
923         INIT_LIST_HEAD(&o2hb_node_events);
924
925         memset(o2hb_live_node_bitmap, 0, sizeof(o2hb_live_node_bitmap));
926 }
927
928 /* if we're already in a callback then we're already serialized by the sem */
929 static void o2hb_fill_node_map_from_callback(unsigned long *map,
930                                              unsigned bytes)
931 {
932         BUG_ON(bytes < (BITS_TO_LONGS(O2NM_MAX_NODES) * sizeof(unsigned long)));
933
934         memcpy(map, &o2hb_live_node_bitmap, bytes);
935 }
936
937 /*
938  * get a map of all nodes that are heartbeating in any regions
939  */
940 void o2hb_fill_node_map(unsigned long *map, unsigned bytes)
941 {
942         /* callers want to serialize this map and callbacks so that they
943          * can trust that they don't miss nodes coming to the party */
944         down_read(&o2hb_callback_sem);
945         spin_lock(&o2hb_live_lock);
946         o2hb_fill_node_map_from_callback(map, bytes);
947         spin_unlock(&o2hb_live_lock);
948         up_read(&o2hb_callback_sem);
949 }
950 EXPORT_SYMBOL_GPL(o2hb_fill_node_map);
951
952 /*
953  * heartbeat configfs bits.  The heartbeat set is a default set under
954  * the cluster set in nodemanager.c.
955  */
956
957 static struct o2hb_region *to_o2hb_region(struct config_item *item)
958 {
959         return item ? container_of(item, struct o2hb_region, hr_item) : NULL;
960 }
961
962 /* drop_item only drops its ref after killing the thread, nothing should
963  * be using the region anymore.  this has to clean up any state that
964  * attributes might have built up. */
965 static void o2hb_region_release(struct config_item *item)
966 {
967         int i;
968         struct page *page;
969         struct o2hb_region *reg = to_o2hb_region(item);
970
971         if (reg->hr_tmp_block)
972                 kfree(reg->hr_tmp_block);
973
974         if (reg->hr_slot_data) {
975                 for (i = 0; i < reg->hr_num_pages; i++) {
976                         page = reg->hr_slot_data[i];
977                         if (page)
978                                 __free_page(page);
979                 }
980                 kfree(reg->hr_slot_data);
981         }
982
983         if (reg->hr_bdev)
984                 blkdev_put(reg->hr_bdev);
985
986         if (reg->hr_slots)
987                 kfree(reg->hr_slots);
988
989         spin_lock(&o2hb_live_lock);
990         list_del(&reg->hr_all_item);
991         spin_unlock(&o2hb_live_lock);
992
993         kfree(reg);
994 }
995
996 static int o2hb_read_block_input(struct o2hb_region *reg,
997                                  const char *page,
998                                  size_t count,
999                                  unsigned long *ret_bytes,
1000                                  unsigned int *ret_bits)
1001 {
1002         unsigned long bytes;
1003         char *p = (char *)page;
1004
1005         bytes = simple_strtoul(p, &p, 0);
1006         if (!p || (*p && (*p != '\n')))
1007                 return -EINVAL;
1008
1009         /* Heartbeat and fs min / max block sizes are the same. */
1010         if (bytes > 4096 || bytes < 512)
1011                 return -ERANGE;
1012         if (hweight16(bytes) != 1)
1013                 return -EINVAL;
1014
1015         if (ret_bytes)
1016                 *ret_bytes = bytes;
1017         if (ret_bits)
1018                 *ret_bits = ffs(bytes) - 1;
1019
1020         return 0;
1021 }
1022
1023 static ssize_t o2hb_region_block_bytes_read(struct o2hb_region *reg,
1024                                             char *page)
1025 {
1026         return sprintf(page, "%u\n", reg->hr_block_bytes);
1027 }
1028
1029 static ssize_t o2hb_region_block_bytes_write(struct o2hb_region *reg,
1030                                              const char *page,
1031                                              size_t count)
1032 {
1033         int status;
1034         unsigned long block_bytes;
1035         unsigned int block_bits;
1036
1037         if (reg->hr_bdev)
1038                 return -EINVAL;
1039
1040         status = o2hb_read_block_input(reg, page, count,
1041                                        &block_bytes, &block_bits);
1042         if (status)
1043                 return status;
1044
1045         reg->hr_block_bytes = (unsigned int)block_bytes;
1046         reg->hr_block_bits = block_bits;
1047
1048         return count;
1049 }
1050
1051 static ssize_t o2hb_region_start_block_read(struct o2hb_region *reg,
1052                                             char *page)
1053 {
1054         return sprintf(page, "%llu\n", reg->hr_start_block);
1055 }
1056
1057 static ssize_t o2hb_region_start_block_write(struct o2hb_region *reg,
1058                                              const char *page,
1059                                              size_t count)
1060 {
1061         unsigned long long tmp;
1062         char *p = (char *)page;
1063
1064         if (reg->hr_bdev)
1065                 return -EINVAL;
1066
1067         tmp = simple_strtoull(p, &p, 0);
1068         if (!p || (*p && (*p != '\n')))
1069                 return -EINVAL;
1070
1071         reg->hr_start_block = tmp;
1072
1073         return count;
1074 }
1075
1076 static ssize_t o2hb_region_blocks_read(struct o2hb_region *reg,
1077                                        char *page)
1078 {
1079         return sprintf(page, "%d\n", reg->hr_blocks);
1080 }
1081
1082 static ssize_t o2hb_region_blocks_write(struct o2hb_region *reg,
1083                                         const char *page,
1084                                         size_t count)
1085 {
1086         unsigned long tmp;
1087         char *p = (char *)page;
1088
1089         if (reg->hr_bdev)
1090                 return -EINVAL;
1091
1092         tmp = simple_strtoul(p, &p, 0);
1093         if (!p || (*p && (*p != '\n')))
1094                 return -EINVAL;
1095
1096         if (tmp > O2NM_MAX_NODES || tmp == 0)
1097                 return -ERANGE;
1098
1099         reg->hr_blocks = (unsigned int)tmp;
1100
1101         return count;
1102 }
1103
1104 static ssize_t o2hb_region_dev_read(struct o2hb_region *reg,
1105                                     char *page)
1106 {
1107         unsigned int ret = 0;
1108
1109         if (reg->hr_bdev)
1110                 ret = sprintf(page, "%s\n", reg->hr_dev_name);
1111
1112         return ret;
1113 }
1114
1115 static void o2hb_init_region_params(struct o2hb_region *reg)
1116 {
1117         reg->hr_slots_per_page = PAGE_CACHE_SIZE >> reg->hr_block_bits;
1118         reg->hr_timeout_ms = O2HB_REGION_TIMEOUT_MS;
1119
1120         mlog(ML_HEARTBEAT, "hr_start_block = %llu, hr_blocks = %u\n",
1121              reg->hr_start_block, reg->hr_blocks);
1122         mlog(ML_HEARTBEAT, "hr_block_bytes = %u, hr_block_bits = %u\n",
1123              reg->hr_block_bytes, reg->hr_block_bits);
1124         mlog(ML_HEARTBEAT, "hr_timeout_ms = %u\n", reg->hr_timeout_ms);
1125         mlog(ML_HEARTBEAT, "dead threshold = %u\n", o2hb_dead_threshold);
1126 }
1127
1128 static int o2hb_map_slot_data(struct o2hb_region *reg)
1129 {
1130         int i, j;
1131         unsigned int last_slot;
1132         unsigned int spp = reg->hr_slots_per_page;
1133         struct page *page;
1134         char *raw;
1135         struct o2hb_disk_slot *slot;
1136
1137         reg->hr_tmp_block = kmalloc(reg->hr_block_bytes, GFP_KERNEL);
1138         if (reg->hr_tmp_block == NULL) {
1139                 mlog_errno(-ENOMEM);
1140                 return -ENOMEM;
1141         }
1142
1143         reg->hr_slots = kcalloc(reg->hr_blocks,
1144                                 sizeof(struct o2hb_disk_slot), GFP_KERNEL);
1145         if (reg->hr_slots == NULL) {
1146                 mlog_errno(-ENOMEM);
1147                 return -ENOMEM;
1148         }
1149
1150         for(i = 0; i < reg->hr_blocks; i++) {
1151                 slot = &reg->hr_slots[i];
1152                 slot->ds_node_num = i;
1153                 INIT_LIST_HEAD(&slot->ds_live_item);
1154                 slot->ds_raw_block = NULL;
1155         }
1156
1157         reg->hr_num_pages = (reg->hr_blocks + spp - 1) / spp;
1158         mlog(ML_HEARTBEAT, "Going to require %u pages to cover %u blocks "
1159                            "at %u blocks per page\n",
1160              reg->hr_num_pages, reg->hr_blocks, spp);
1161
1162         reg->hr_slot_data = kcalloc(reg->hr_num_pages, sizeof(struct page *),
1163                                     GFP_KERNEL);
1164         if (!reg->hr_slot_data) {
1165                 mlog_errno(-ENOMEM);
1166                 return -ENOMEM;
1167         }
1168
1169         for(i = 0; i < reg->hr_num_pages; i++) {
1170                 page = alloc_page(GFP_KERNEL);
1171                 if (!page) {
1172                         mlog_errno(-ENOMEM);
1173                         return -ENOMEM;
1174                 }
1175
1176                 reg->hr_slot_data[i] = page;
1177
1178                 last_slot = i * spp;
1179                 raw = page_address(page);
1180                 for (j = 0;
1181                      (j < spp) && ((j + last_slot) < reg->hr_blocks);
1182                      j++) {
1183                         BUG_ON((j + last_slot) >= reg->hr_blocks);
1184
1185                         slot = &reg->hr_slots[j + last_slot];
1186                         slot->ds_raw_block =
1187                                 (struct o2hb_disk_heartbeat_block *) raw;
1188
1189                         raw += reg->hr_block_bytes;
1190                 }
1191         }
1192
1193         return 0;
1194 }
1195
1196 /* Read in all the slots available and populate the tracking
1197  * structures so that we can start with a baseline idea of what's
1198  * there. */
1199 static int o2hb_populate_slot_data(struct o2hb_region *reg)
1200 {
1201         int ret, i;
1202         struct o2hb_disk_slot *slot;
1203         struct o2hb_disk_heartbeat_block *hb_block;
1204
1205         mlog_entry_void();
1206
1207         ret = o2hb_read_slots(reg, reg->hr_blocks);
1208         if (ret) {
1209                 mlog_errno(ret);
1210                 goto out;
1211         }
1212
1213         /* We only want to get an idea of the values initially in each
1214          * slot, so we do no verification - o2hb_check_slot will
1215          * actually determine if each configured slot is valid and
1216          * whether any values have changed. */
1217         for(i = 0; i < reg->hr_blocks; i++) {
1218                 slot = &reg->hr_slots[i];
1219                 hb_block = (struct o2hb_disk_heartbeat_block *) slot->ds_raw_block;
1220
1221                 /* Only fill the values that o2hb_check_slot uses to
1222                  * determine changing slots */
1223                 slot->ds_last_time = le64_to_cpu(hb_block->hb_seq);
1224                 slot->ds_last_generation = le64_to_cpu(hb_block->hb_generation);
1225         }
1226
1227 out:
1228         mlog_exit(ret);
1229         return ret;
1230 }
1231
1232 /* this is acting as commit; we set up all of hr_bdev and hr_task or nothing */
1233 static ssize_t o2hb_region_dev_write(struct o2hb_region *reg,
1234                                      const char *page,
1235                                      size_t count)
1236 {
1237         struct task_struct *hb_task;
1238         long fd;
1239         int sectsize;
1240         char *p = (char *)page;
1241         struct file *filp = NULL;
1242         struct inode *inode = NULL;
1243         ssize_t ret = -EINVAL;
1244
1245         if (reg->hr_bdev)
1246                 goto out;
1247
1248         /* We can't heartbeat without having had our node number
1249          * configured yet. */
1250         if (o2nm_this_node() == O2NM_MAX_NODES)
1251                 goto out;
1252
1253         fd = simple_strtol(p, &p, 0);
1254         if (!p || (*p && (*p != '\n')))
1255                 goto out;
1256
1257         if (fd < 0 || fd >= INT_MAX)
1258                 goto out;
1259
1260         filp = fget(fd);
1261         if (filp == NULL)
1262                 goto out;
1263
1264         if (reg->hr_blocks == 0 || reg->hr_start_block == 0 ||
1265             reg->hr_block_bytes == 0)
1266                 goto out;
1267
1268         inode = igrab(filp->f_mapping->host);
1269         if (inode == NULL)
1270                 goto out;
1271
1272         if (!S_ISBLK(inode->i_mode))
1273                 goto out;
1274
1275         reg->hr_bdev = I_BDEV(filp->f_mapping->host);
1276         ret = blkdev_get(reg->hr_bdev, FMODE_WRITE | FMODE_READ, 0);
1277         if (ret) {
1278                 reg->hr_bdev = NULL;
1279                 goto out;
1280         }
1281         inode = NULL;
1282
1283         bdevname(reg->hr_bdev, reg->hr_dev_name);
1284
1285         sectsize = bdev_hardsect_size(reg->hr_bdev);
1286         if (sectsize != reg->hr_block_bytes) {
1287                 mlog(ML_ERROR,
1288                      "blocksize %u incorrect for device, expected %d",
1289                      reg->hr_block_bytes, sectsize);
1290                 ret = -EINVAL;
1291                 goto out;
1292         }
1293
1294         o2hb_init_region_params(reg);
1295
1296         /* Generation of zero is invalid */
1297         do {
1298                 get_random_bytes(&reg->hr_generation,
1299                                  sizeof(reg->hr_generation));
1300         } while (reg->hr_generation == 0);
1301
1302         ret = o2hb_map_slot_data(reg);
1303         if (ret) {
1304                 mlog_errno(ret);
1305                 goto out;
1306         }
1307
1308         ret = o2hb_populate_slot_data(reg);
1309         if (ret) {
1310                 mlog_errno(ret);
1311                 goto out;
1312         }
1313
1314         INIT_DELAYED_WORK(&reg->hr_write_timeout_work, o2hb_write_timeout);
1315
1316         /*
1317          * A node is considered live after it has beat LIVE_THRESHOLD
1318          * times.  We're not steady until we've given them a chance
1319          * _after_ our first read.
1320          */
1321         atomic_set(&reg->hr_steady_iterations, O2HB_LIVE_THRESHOLD + 1);
1322
1323         hb_task = kthread_run(o2hb_thread, reg, "o2hb-%s",
1324                               reg->hr_item.ci_name);
1325         if (IS_ERR(hb_task)) {
1326                 ret = PTR_ERR(hb_task);
1327                 mlog_errno(ret);
1328                 goto out;
1329         }
1330
1331         spin_lock(&o2hb_live_lock);
1332         reg->hr_task = hb_task;
1333         spin_unlock(&o2hb_live_lock);
1334
1335         ret = wait_event_interruptible(o2hb_steady_queue,
1336                                 atomic_read(&reg->hr_steady_iterations) == 0);
1337         if (ret) {
1338                 /* We got interrupted (hello ptrace!).  Clean up */
1339                 spin_lock(&o2hb_live_lock);
1340                 hb_task = reg->hr_task;
1341                 reg->hr_task = NULL;
1342                 spin_unlock(&o2hb_live_lock);
1343
1344                 if (hb_task)
1345                         kthread_stop(hb_task);
1346                 goto out;
1347         }
1348
1349         /* Ok, we were woken.  Make sure it wasn't by drop_item() */
1350         spin_lock(&o2hb_live_lock);
1351         hb_task = reg->hr_task;
1352         spin_unlock(&o2hb_live_lock);
1353
1354         if (hb_task)
1355                 ret = count;
1356         else
1357                 ret = -EIO;
1358
1359 out:
1360         if (filp)
1361                 fput(filp);
1362         if (inode)
1363                 iput(inode);
1364         if (ret < 0) {
1365                 if (reg->hr_bdev) {
1366                         blkdev_put(reg->hr_bdev);
1367                         reg->hr_bdev = NULL;
1368                 }
1369         }
1370         return ret;
1371 }
1372
1373 static ssize_t o2hb_region_pid_read(struct o2hb_region *reg,
1374                                       char *page)
1375 {
1376         pid_t pid = 0;
1377
1378         spin_lock(&o2hb_live_lock);
1379         if (reg->hr_task)
1380                 pid = reg->hr_task->pid;
1381         spin_unlock(&o2hb_live_lock);
1382
1383         if (!pid)
1384                 return 0;
1385
1386         return sprintf(page, "%u\n", pid);
1387 }
1388
1389 struct o2hb_region_attribute {
1390         struct configfs_attribute attr;
1391         ssize_t (*show)(struct o2hb_region *, char *);
1392         ssize_t (*store)(struct o2hb_region *, const char *, size_t);
1393 };
1394
1395 static struct o2hb_region_attribute o2hb_region_attr_block_bytes = {
1396         .attr   = { .ca_owner = THIS_MODULE,
1397                     .ca_name = "block_bytes",
1398                     .ca_mode = S_IRUGO | S_IWUSR },
1399         .show   = o2hb_region_block_bytes_read,
1400         .store  = o2hb_region_block_bytes_write,
1401 };
1402
1403 static struct o2hb_region_attribute o2hb_region_attr_start_block = {
1404         .attr   = { .ca_owner = THIS_MODULE,
1405                     .ca_name = "start_block",
1406                     .ca_mode = S_IRUGO | S_IWUSR },
1407         .show   = o2hb_region_start_block_read,
1408         .store  = o2hb_region_start_block_write,
1409 };
1410
1411 static struct o2hb_region_attribute o2hb_region_attr_blocks = {
1412         .attr   = { .ca_owner = THIS_MODULE,
1413                     .ca_name = "blocks",
1414                     .ca_mode = S_IRUGO | S_IWUSR },
1415         .show   = o2hb_region_blocks_read,
1416         .store  = o2hb_region_blocks_write,
1417 };
1418
1419 static struct o2hb_region_attribute o2hb_region_attr_dev = {
1420         .attr   = { .ca_owner = THIS_MODULE,
1421                     .ca_name = "dev",
1422                     .ca_mode = S_IRUGO | S_IWUSR },
1423         .show   = o2hb_region_dev_read,
1424         .store  = o2hb_region_dev_write,
1425 };
1426
1427 static struct o2hb_region_attribute o2hb_region_attr_pid = {
1428        .attr   = { .ca_owner = THIS_MODULE,
1429                    .ca_name = "pid",
1430                    .ca_mode = S_IRUGO | S_IRUSR },
1431        .show   = o2hb_region_pid_read,
1432 };
1433
1434 static struct configfs_attribute *o2hb_region_attrs[] = {
1435         &o2hb_region_attr_block_bytes.attr,
1436         &o2hb_region_attr_start_block.attr,
1437         &o2hb_region_attr_blocks.attr,
1438         &o2hb_region_attr_dev.attr,
1439         &o2hb_region_attr_pid.attr,
1440         NULL,
1441 };
1442
1443 static ssize_t o2hb_region_show(struct config_item *item,
1444                                 struct configfs_attribute *attr,
1445                                 char *page)
1446 {
1447         struct o2hb_region *reg = to_o2hb_region(item);
1448         struct o2hb_region_attribute *o2hb_region_attr =
1449                 container_of(attr, struct o2hb_region_attribute, attr);
1450         ssize_t ret = 0;
1451
1452         if (o2hb_region_attr->show)
1453                 ret = o2hb_region_attr->show(reg, page);
1454         return ret;
1455 }
1456
1457 static ssize_t o2hb_region_store(struct config_item *item,
1458                                  struct configfs_attribute *attr,
1459                                  const char *page, size_t count)
1460 {
1461         struct o2hb_region *reg = to_o2hb_region(item);
1462         struct o2hb_region_attribute *o2hb_region_attr =
1463                 container_of(attr, struct o2hb_region_attribute, attr);
1464         ssize_t ret = -EINVAL;
1465
1466         if (o2hb_region_attr->store)
1467                 ret = o2hb_region_attr->store(reg, page, count);
1468         return ret;
1469 }
1470
1471 static struct configfs_item_operations o2hb_region_item_ops = {
1472         .release                = o2hb_region_release,
1473         .show_attribute         = o2hb_region_show,
1474         .store_attribute        = o2hb_region_store,
1475 };
1476
1477 static struct config_item_type o2hb_region_type = {
1478         .ct_item_ops    = &o2hb_region_item_ops,
1479         .ct_attrs       = o2hb_region_attrs,
1480         .ct_owner       = THIS_MODULE,
1481 };
1482
1483 /* heartbeat set */
1484
1485 struct o2hb_heartbeat_group {
1486         struct config_group hs_group;
1487         /* some stuff? */
1488 };
1489
1490 static struct o2hb_heartbeat_group *to_o2hb_heartbeat_group(struct config_group *group)
1491 {
1492         return group ?
1493                 container_of(group, struct o2hb_heartbeat_group, hs_group)
1494                 : NULL;
1495 }
1496
1497 static struct config_item *o2hb_heartbeat_group_make_item(struct config_group *group,
1498                                                           const char *name)
1499 {
1500         struct o2hb_region *reg = NULL;
1501         struct config_item *ret = NULL;
1502
1503         reg = kzalloc(sizeof(struct o2hb_region), GFP_KERNEL);
1504         if (reg == NULL)
1505                 goto out; /* ENOMEM */
1506
1507         config_item_init_type_name(&reg->hr_item, name, &o2hb_region_type);
1508
1509         ret = &reg->hr_item;
1510
1511         spin_lock(&o2hb_live_lock);
1512         list_add_tail(&reg->hr_all_item, &o2hb_all_regions);
1513         spin_unlock(&o2hb_live_lock);
1514 out:
1515         if (ret == NULL)
1516                 kfree(reg);
1517
1518         return ret;
1519 }
1520
1521 static void o2hb_heartbeat_group_drop_item(struct config_group *group,
1522                                            struct config_item *item)
1523 {
1524         struct task_struct *hb_task;
1525         struct o2hb_region *reg = to_o2hb_region(item);
1526
1527         /* stop the thread when the user removes the region dir */
1528         spin_lock(&o2hb_live_lock);
1529         hb_task = reg->hr_task;
1530         reg->hr_task = NULL;
1531         spin_unlock(&o2hb_live_lock);
1532
1533         if (hb_task)
1534                 kthread_stop(hb_task);
1535
1536         /*
1537          * If we're racing a dev_write(), we need to wake them.  They will
1538          * check reg->hr_task
1539          */
1540         if (atomic_read(&reg->hr_steady_iterations) != 0) {
1541                 atomic_set(&reg->hr_steady_iterations, 0);
1542                 wake_up(&o2hb_steady_queue);
1543         }
1544
1545         config_item_put(item);
1546 }
1547
1548 struct o2hb_heartbeat_group_attribute {
1549         struct configfs_attribute attr;
1550         ssize_t (*show)(struct o2hb_heartbeat_group *, char *);
1551         ssize_t (*store)(struct o2hb_heartbeat_group *, const char *, size_t);
1552 };
1553
1554 static ssize_t o2hb_heartbeat_group_show(struct config_item *item,
1555                                          struct configfs_attribute *attr,
1556                                          char *page)
1557 {
1558         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1559         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1560                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1561         ssize_t ret = 0;
1562
1563         if (o2hb_heartbeat_group_attr->show)
1564                 ret = o2hb_heartbeat_group_attr->show(reg, page);
1565         return ret;
1566 }
1567
1568 static ssize_t o2hb_heartbeat_group_store(struct config_item *item,
1569                                           struct configfs_attribute *attr,
1570                                           const char *page, size_t count)
1571 {
1572         struct o2hb_heartbeat_group *reg = to_o2hb_heartbeat_group(to_config_group(item));
1573         struct o2hb_heartbeat_group_attribute *o2hb_heartbeat_group_attr =
1574                 container_of(attr, struct o2hb_heartbeat_group_attribute, attr);
1575         ssize_t ret = -EINVAL;
1576
1577         if (o2hb_heartbeat_group_attr->store)
1578                 ret = o2hb_heartbeat_group_attr->store(reg, page, count);
1579         return ret;
1580 }
1581
1582 static ssize_t o2hb_heartbeat_group_threshold_show(struct o2hb_heartbeat_group *group,
1583                                                      char *page)
1584 {
1585         return sprintf(page, "%u\n", o2hb_dead_threshold);
1586 }
1587
1588 static ssize_t o2hb_heartbeat_group_threshold_store(struct o2hb_heartbeat_group *group,
1589                                                     const char *page,
1590                                                     size_t count)
1591 {
1592         unsigned long tmp;
1593         char *p = (char *)page;
1594
1595         tmp = simple_strtoul(p, &p, 10);
1596         if (!p || (*p && (*p != '\n')))
1597                 return -EINVAL;
1598
1599         /* this will validate ranges for us. */
1600         o2hb_dead_threshold_set((unsigned int) tmp);
1601
1602         return count;
1603 }
1604
1605 static struct o2hb_heartbeat_group_attribute o2hb_heartbeat_group_attr_threshold = {
1606         .attr   = { .ca_owner = THIS_MODULE,
1607                     .ca_name = "dead_threshold",
1608                     .ca_mode = S_IRUGO | S_IWUSR },
1609         .show   = o2hb_heartbeat_group_threshold_show,
1610         .store  = o2hb_heartbeat_group_threshold_store,
1611 };
1612
1613 static struct configfs_attribute *o2hb_heartbeat_group_attrs[] = {
1614         &o2hb_heartbeat_group_attr_threshold.attr,
1615         NULL,
1616 };
1617
1618 static struct configfs_item_operations o2hb_hearbeat_group_item_ops = {
1619         .show_attribute         = o2hb_heartbeat_group_show,
1620         .store_attribute        = o2hb_heartbeat_group_store,
1621 };
1622
1623 static struct configfs_group_operations o2hb_heartbeat_group_group_ops = {
1624         .make_item      = o2hb_heartbeat_group_make_item,
1625         .drop_item      = o2hb_heartbeat_group_drop_item,
1626 };
1627
1628 static struct config_item_type o2hb_heartbeat_group_type = {
1629         .ct_group_ops   = &o2hb_heartbeat_group_group_ops,
1630         .ct_item_ops    = &o2hb_hearbeat_group_item_ops,
1631         .ct_attrs       = o2hb_heartbeat_group_attrs,
1632         .ct_owner       = THIS_MODULE,
1633 };
1634
1635 /* this is just here to avoid touching group in heartbeat.h which the
1636  * entire damn world #includes */
1637 struct config_group *o2hb_alloc_hb_set(void)
1638 {
1639         struct o2hb_heartbeat_group *hs = NULL;
1640         struct config_group *ret = NULL;
1641
1642         hs = kzalloc(sizeof(struct o2hb_heartbeat_group), GFP_KERNEL);
1643         if (hs == NULL)
1644                 goto out;
1645
1646         config_group_init_type_name(&hs->hs_group, "heartbeat",
1647                                     &o2hb_heartbeat_group_type);
1648
1649         ret = &hs->hs_group;
1650 out:
1651         if (ret == NULL)
1652                 kfree(hs);
1653         return ret;
1654 }
1655
1656 void o2hb_free_hb_set(struct config_group *group)
1657 {
1658         struct o2hb_heartbeat_group *hs = to_o2hb_heartbeat_group(group);
1659         kfree(hs);
1660 }
1661
1662 /* hb callback registration and issueing */
1663
1664 static struct o2hb_callback *hbcall_from_type(enum o2hb_callback_type type)
1665 {
1666         if (type == O2HB_NUM_CB)
1667                 return ERR_PTR(-EINVAL);
1668
1669         return &o2hb_callbacks[type];
1670 }
1671
1672 void o2hb_setup_callback(struct o2hb_callback_func *hc,
1673                          enum o2hb_callback_type type,
1674                          o2hb_cb_func *func,
1675                          void *data,
1676                          int priority)
1677 {
1678         INIT_LIST_HEAD(&hc->hc_item);
1679         hc->hc_func = func;
1680         hc->hc_data = data;
1681         hc->hc_priority = priority;
1682         hc->hc_type = type;
1683         hc->hc_magic = O2HB_CB_MAGIC;
1684 }
1685 EXPORT_SYMBOL_GPL(o2hb_setup_callback);
1686
1687 static struct o2hb_region *o2hb_find_region(const char *region_uuid)
1688 {
1689         struct o2hb_region *p, *reg = NULL;
1690
1691         assert_spin_locked(&o2hb_live_lock);
1692
1693         list_for_each_entry(p, &o2hb_all_regions, hr_all_item) {
1694                 if (!strcmp(region_uuid, config_item_name(&p->hr_item))) {
1695                         reg = p;
1696                         break;
1697                 }
1698         }
1699
1700         return reg;
1701 }
1702
1703 static int o2hb_region_get(const char *region_uuid)
1704 {
1705         int ret = 0;
1706         struct o2hb_region *reg;
1707
1708         spin_lock(&o2hb_live_lock);
1709
1710         reg = o2hb_find_region(region_uuid);
1711         if (!reg)
1712                 ret = -ENOENT;
1713         spin_unlock(&o2hb_live_lock);
1714
1715         if (ret)
1716                 goto out;
1717
1718         ret = o2nm_depend_this_node();
1719         if (ret)
1720                 goto out;
1721
1722         ret = o2nm_depend_item(&reg->hr_item);
1723         if (ret)
1724                 o2nm_undepend_this_node();
1725
1726 out:
1727         return ret;
1728 }
1729
1730 static void o2hb_region_put(const char *region_uuid)
1731 {
1732         struct o2hb_region *reg;
1733
1734         spin_lock(&o2hb_live_lock);
1735
1736         reg = o2hb_find_region(region_uuid);
1737
1738         spin_unlock(&o2hb_live_lock);
1739
1740         if (reg) {
1741                 o2nm_undepend_item(&reg->hr_item);
1742                 o2nm_undepend_this_node();
1743         }
1744 }
1745
1746 int o2hb_register_callback(const char *region_uuid,
1747                            struct o2hb_callback_func *hc)
1748 {
1749         struct o2hb_callback_func *tmp;
1750         struct list_head *iter;
1751         struct o2hb_callback *hbcall;
1752         int ret;
1753
1754         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1755         BUG_ON(!list_empty(&hc->hc_item));
1756
1757         hbcall = hbcall_from_type(hc->hc_type);
1758         if (IS_ERR(hbcall)) {
1759                 ret = PTR_ERR(hbcall);
1760                 goto out;
1761         }
1762
1763         if (region_uuid) {
1764                 ret = o2hb_region_get(region_uuid);
1765                 if (ret)
1766                         goto out;
1767         }
1768
1769         down_write(&o2hb_callback_sem);
1770
1771         list_for_each(iter, &hbcall->list) {
1772                 tmp = list_entry(iter, struct o2hb_callback_func, hc_item);
1773                 if (hc->hc_priority < tmp->hc_priority) {
1774                         list_add_tail(&hc->hc_item, iter);
1775                         break;
1776                 }
1777         }
1778         if (list_empty(&hc->hc_item))
1779                 list_add_tail(&hc->hc_item, &hbcall->list);
1780
1781         up_write(&o2hb_callback_sem);
1782         ret = 0;
1783 out:
1784         mlog(ML_HEARTBEAT, "returning %d on behalf of %p for funcs %p\n",
1785              ret, __builtin_return_address(0), hc);
1786         return ret;
1787 }
1788 EXPORT_SYMBOL_GPL(o2hb_register_callback);
1789
1790 void o2hb_unregister_callback(const char *region_uuid,
1791                               struct o2hb_callback_func *hc)
1792 {
1793         BUG_ON(hc->hc_magic != O2HB_CB_MAGIC);
1794
1795         mlog(ML_HEARTBEAT, "on behalf of %p for funcs %p\n",
1796              __builtin_return_address(0), hc);
1797
1798         /* XXX Can this happen _with_ a region reference? */
1799         if (list_empty(&hc->hc_item))
1800                 return;
1801
1802         if (region_uuid)
1803                 o2hb_region_put(region_uuid);
1804
1805         down_write(&o2hb_callback_sem);
1806
1807         list_del_init(&hc->hc_item);
1808
1809         up_write(&o2hb_callback_sem);
1810 }
1811 EXPORT_SYMBOL_GPL(o2hb_unregister_callback);
1812
1813 int o2hb_check_node_heartbeating(u8 node_num)
1814 {
1815         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1816
1817         o2hb_fill_node_map(testing_map, sizeof(testing_map));
1818         if (!test_bit(node_num, testing_map)) {
1819                 mlog(ML_HEARTBEAT,
1820                      "node (%u) does not have heartbeating enabled.\n",
1821                      node_num);
1822                 return 0;
1823         }
1824
1825         return 1;
1826 }
1827 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating);
1828
1829 int o2hb_check_node_heartbeating_from_callback(u8 node_num)
1830 {
1831         unsigned long testing_map[BITS_TO_LONGS(O2NM_MAX_NODES)];
1832
1833         o2hb_fill_node_map_from_callback(testing_map, sizeof(testing_map));
1834         if (!test_bit(node_num, testing_map)) {
1835                 mlog(ML_HEARTBEAT,
1836                      "node (%u) does not have heartbeating enabled.\n",
1837                      node_num);
1838                 return 0;
1839         }
1840
1841         return 1;
1842 }
1843 EXPORT_SYMBOL_GPL(o2hb_check_node_heartbeating_from_callback);
1844
1845 /* Makes sure our local node is configured with a node number, and is
1846  * heartbeating. */
1847 int o2hb_check_local_node_heartbeating(void)
1848 {
1849         u8 node_num;
1850
1851         /* if this node was set then we have networking */
1852         node_num = o2nm_this_node();
1853         if (node_num == O2NM_MAX_NODES) {
1854                 mlog(ML_HEARTBEAT, "this node has not been configured.\n");
1855                 return 0;
1856         }
1857
1858         return o2hb_check_node_heartbeating(node_num);
1859 }
1860 EXPORT_SYMBOL_GPL(o2hb_check_local_node_heartbeating);
1861
1862 /*
1863  * this is just a hack until we get the plumbing which flips file systems
1864  * read only and drops the hb ref instead of killing the node dead.
1865  */
1866 void o2hb_stop_all_regions(void)
1867 {
1868         struct o2hb_region *reg;
1869
1870         mlog(ML_ERROR, "stopping heartbeat on all active regions.\n");
1871
1872         spin_lock(&o2hb_live_lock);
1873
1874         list_for_each_entry(reg, &o2hb_all_regions, hr_all_item)
1875                 reg->hr_unclean_stop = 1;
1876
1877         spin_unlock(&o2hb_live_lock);
1878 }
1879 EXPORT_SYMBOL_GPL(o2hb_stop_all_regions);